This is an automated email from the ASF dual-hosted git repository.
meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fe75ff8 [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API
documentation
fe75ff8 is described below
commit fe75ff8bea3330a10aba1a61f3aba42e541195a8
Author: HyukjinKwon <[email protected]>
AuthorDate: Fri Jul 5 10:08:22 2019 -0700
[SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API documentation
## What changes were proposed in this pull request?
Seems like we used to generate PySpark API documentation by Epydoc almost
at the very first place (see
https://github.com/apache/spark/commit/85b8f2c64f0fc4be5645d8736629fc082cb3587b).
This fixes an actual issue:
Before:

After:

It seems apparently a bug within `epytext` plugin during the conversion
between`param` and `:param` syntax. See also [Epydoc
syntax](http://epydoc.sourceforge.net/manual-epytext.html).
Actually, Epydoc syntax violates
[PEP-257](https://www.python.org/dev/peps/pep-0257/) IIRC and blocks us to
enable some rules for doctest linter as well.
We should remove this legacy away and I guess Spark 3 is good timing to do
it.
## How was this patch tested?
Manually built the doc and check each.
I had to manually find the Epydoc syntax by `git grep -r "{L"`, for
instance.
Closes #25060 from HyukjinKwon/SPARK-28206.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>
---
python/docs/conf.py | 1 -
python/docs/epytext.py | 30 --------
python/pyspark/accumulators.py | 14 ++--
python/pyspark/broadcast.py | 6 +-
python/pyspark/conf.py | 8 +--
python/pyspark/context.py | 56 +++++++--------
python/pyspark/files.py | 7 +-
python/pyspark/ml/feature.py | 2 +-
python/pyspark/ml/linalg/__init__.py | 8 +--
python/pyspark/mllib/classification.py | 4 +-
python/pyspark/mllib/clustering.py | 6 +-
python/pyspark/mllib/linalg/__init__.py | 8 +--
python/pyspark/mllib/random.py | 6 +-
python/pyspark/mllib/stat/_statistics.py | 4 +-
python/pyspark/mllib/util.py | 4 +-
python/pyspark/rdd.py | 114 +++++++++++++++----------------
python/pyspark/serializers.py | 12 ++--
python/pyspark/sql/dataframe.py | 10 +--
python/pyspark/sql/types.py | 2 +-
python/pyspark/streaming/context.py | 42 ++++++------
python/pyspark/streaming/dstream.py | 50 +++++++-------
python/pyspark/taskcontext.py | 2 +-
python/pyspark/testing/streamingutils.py | 6 +-
23 files changed, 185 insertions(+), 217 deletions(-)
diff --git a/python/docs/conf.py b/python/docs/conf.py
index f507ee3..9e7afb7 100644
--- a/python/docs/conf.py
+++ b/python/docs/conf.py
@@ -31,7 +31,6 @@ needs_sphinx = '1.2'
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.viewcode',
- 'epytext',
'sphinx.ext.mathjax',
]
diff --git a/python/docs/epytext.py b/python/docs/epytext.py
deleted file mode 100644
index 4bbbf65..0000000
--- a/python/docs/epytext.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import re
-
-RULES = (
- (r"<(!BLANKLINE)[\w.]+>", r""),
- (r"L{([\w.()]+)}", r":class:`\1`"),
- (r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"),
- (r"C{([\w.()]+)}", r":class:`\1`"),
- (r"[IBCM]{([^}]+)}", r"`\1`"),
- ('pyspark.rdd.RDD', 'RDD'),
-)
-
-
-def _convert_epytext(line):
- """
- >>> _convert_epytext("L{A}")
- :class:`A`
- """
- line = line.replace('@', ':')
- for p, sub in RULES:
- line = re.sub(p, sub, line)
- return line
-
-
-def _process_docstring(app, what, name, obj, options, lines):
- for i in range(len(lines)):
- lines[i] = _convert_epytext(lines[i])
-
-
-def setup(app):
- app.connect("autodoc-process-docstring", _process_docstring)
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 00ec094..a5d5132 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -123,13 +123,13 @@ class Accumulator(object):
"""
A shared variable that can be accumulated, i.e., has a commutative and
associative "add"
- operation. Worker tasks on a Spark cluster can add values to an
Accumulator with the C{+=}
- operator, but only the driver program is allowed to access its value,
using C{value}.
+ operation. Worker tasks on a Spark cluster can add values to an
Accumulator with the `+=`
+ operator, but only the driver program is allowed to access its value,
using `value`.
Updates from the workers get propagated automatically to the driver
program.
- While C{SparkContext} supports accumulators for primitive data types like
C{int} and
- C{float}, users can also define accumulators for custom types by providing
a custom
- L{AccumulatorParam} object. Refer to the doctest of this module for an
example.
+ While :class:`SparkContext` supports accumulators for primitive data types
like :class:`int` and
+ :class:`float`, users can also define accumulators for custom types by
providing a custom
+ :class:`AccumulatorParam` object. Refer to the doctest of this module for
an example.
"""
def __init__(self, aid, value, accum_param):
@@ -185,14 +185,14 @@ class AccumulatorParam(object):
def zero(self, value):
"""
Provide a "zero value" for the type, compatible in dimensions with the
- provided C{value} (e.g., a zero vector)
+ provided `value` (e.g., a zero vector)
"""
raise NotImplementedError
def addInPlace(self, value1, value2):
"""
Add two values of the accumulator's data type, returning a new value;
- for efficiency, can also update C{value1} in place and return it.
+ for efficiency, can also update `value1` in place and return it.
"""
raise NotImplementedError
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index cca64b5..a97d409 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -49,8 +49,8 @@ def _from_id(bid):
class Broadcast(object):
"""
- A broadcast variable created with L{SparkContext.broadcast()}.
- Access its value through C{.value}.
+ A broadcast variable created with :meth:`SparkContext.broadcast`.
+ Access its value through :attr:`value`.
Examples:
@@ -69,7 +69,7 @@ class Broadcast(object):
def __init__(self, sc=None, value=None, pickle_registry=None, path=None,
sock_file=None):
"""
- Should not be called directly by users -- use
L{SparkContext.broadcast()}
+ Should not be called directly by users -- use
:meth:`SparkContext.broadcast`
instead.
"""
if sc is not None:
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index ab429d9..2024260 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -79,16 +79,16 @@ class SparkConf(object):
parameters as key-value pairs.
Most of the time, you would create a SparkConf object with
- C{SparkConf()}, which will load values from C{spark.*} Java system
+ ``SparkConf()``, which will load values from `spark.*` Java system
properties as well. In this case, any parameters you set directly on
- the C{SparkConf} object take priority over system properties.
+ the :class:`SparkConf` object take priority over system properties.
- For unit tests, you can also call C{SparkConf(false)} to skip
+ For unit tests, you can also call ``SparkConf(false)`` to skip
loading external settings and get the same configuration no matter
what the system properties are.
All setter methods in this class support chaining. For example,
- you can write C{conf.setMaster("local").setAppName("My app")}.
+ you can write ``conf.setMaster("local").setAppName("My app")``.
.. note:: Once a SparkConf object is passed to Spark, it is cloned
and can no longer be modified by the user.
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a835298..69020e6 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -61,7 +61,7 @@ class SparkContext(object):
"""
Main entry point for Spark functionality. A SparkContext represents the
- connection to a Spark cluster, and can be used to create L{RDD} and
+ connection to a Spark cluster, and can be used to create :class:`RDD` and
broadcast variables on that cluster.
.. note:: Only one :class:`SparkContext` should be active per JVM. You
must `stop()`
@@ -86,7 +86,7 @@ class SparkContext(object):
gateway=None, jsc=None, profiler_cls=BasicProfiler):
"""
Create a new SparkContext. At least the master and app name should be
set,
- either through the named parameters here or through C{conf}.
+ either through the named parameters here or through `conf`.
:param master: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
@@ -102,7 +102,7 @@ class SparkContext(object):
the batch size based on object sizes, or -1 to use an unlimited
batch size
:param serializer: The serializer for RDDs.
- :param conf: A L{SparkConf} object setting Spark properties.
+ :param conf: A :class:`SparkConf` object setting Spark properties.
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instantiated.
:param jsc: The JavaSparkContext instance (optional).
@@ -576,7 +576,7 @@ class SparkContext(object):
def pickleFile(self, name, minPartitions=None):
"""
- Load an RDD previously saved using L{RDD.saveAsPickleFile} method.
+ Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method.
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
@@ -624,20 +624,24 @@ class SparkContext(object):
as `utf-8`), which is faster and smaller than unicode. (Added in
Spark 1.2)
- For example, if you have the following files::
+ For example, if you have the following files:
- hdfs://a-hdfs-path/part-00000
- hdfs://a-hdfs-path/part-00001
- ...
- hdfs://a-hdfs-path/part-nnnnn
+ .. code-block:: text
- Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")},
- then C{rdd} contains::
+ hdfs://a-hdfs-path/part-00000
+ hdfs://a-hdfs-path/part-00001
+ ...
+ hdfs://a-hdfs-path/part-nnnnn
+
+ Do ``rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")``,
+ then ``rdd`` contains:
- (a-hdfs-path/part-00000, its content)
- (a-hdfs-path/part-00001, its content)
- ...
- (a-hdfs-path/part-nnnnn, its content)
+ .. code-block:: text
+
+ (a-hdfs-path/part-00000, its content)
+ (a-hdfs-path/part-00001, its content)
+ ...
+ (a-hdfs-path/part-nnnnn, its content)
.. note:: Small files are preferred, as each file will be loaded
fully in memory.
@@ -705,7 +709,7 @@ class SparkContext(object):
and value Writable classes
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key
and value
- 4. C{PickleSerializer} is used to deserialize pickled objects on
the Python side
+ 4. :class:`PickleSerializer` is used to deserialize pickled
objects on the Python side
:param path: path to sequncefile
:param keyClass: fully qualified classname of key Writable class
@@ -872,8 +876,7 @@ class SparkContext(object):
def broadcast(self, value):
"""
- Broadcast a read-only variable to the cluster, returning a
- L{Broadcast<pyspark.broadcast.Broadcast>}
+ Broadcast a read-only variable to the cluster, returning a
:class:`Broadcast`
object for reading it in distributed functions. The variable will
be sent to each cluster only once.
"""
@@ -881,8 +884,8 @@ class SparkContext(object):
def accumulator(self, value, accum_param=None):
"""
- Create an L{Accumulator} with the given initial value, using a given
- L{AccumulatorParam} helper object to define how to add values of the
+ Create an :class:`Accumulator` with the given initial value, using a
given
+ :class:`AccumulatorParam` helper object to define how to add values of
the
data type if provided. Default AccumulatorParams are used for integers
and floating-point numbers if you do not provide one. For other types,
a custom AccumulatorParam can be used.
@@ -902,12 +905,11 @@ class SparkContext(object):
def addFile(self, path, recursive=False):
"""
Add a file to be downloaded with this Spark job on every node.
- The C{path} passed can be either a local file, a file in HDFS
+ The `path` passed can be either a local file, a file in HDFS
(or other Hadoop-supported filesystems), or an HTTP, HTTPS or
FTP URI.
- To access the file in Spark jobs, use
- L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} with the
+ To access the file in Spark jobs, use :meth:`SparkFiles.get` with the
filename to find its download location.
A directory can be given if the recursive option is set to True.
@@ -932,7 +934,7 @@ class SparkContext(object):
def addPyFile(self, path):
"""
Add a .py or .zip dependency for all tasks to be executed on this
- SparkContext in the future. The C{path} passed can be either a local
+ SparkContext in the future. The `path` passed can be either a local
file, a file in HDFS (or other Hadoop-supported filesystems), or an
HTTP, HTTPS or FTP URI.
@@ -978,7 +980,7 @@ class SparkContext(object):
Application programmers can use this method to group all those jobs
together and give a
group description. Once set, the Spark web UI will associate such jobs
with this group.
- The application can use L{SparkContext.cancelJobGroup} to cancel all
+ The application can use :meth:`SparkContext.cancelJobGroup` to cancel
all
running jobs in this group.
>>> import threading
@@ -1023,7 +1025,7 @@ class SparkContext(object):
def getLocalProperty(self, key):
"""
Get a local property set in this thread, or null if it is missing. See
- L{setLocalProperty}
+ :meth:`setLocalProperty`.
"""
return self._jsc.getLocalProperty(key)
@@ -1041,7 +1043,7 @@ class SparkContext(object):
def cancelJobGroup(self, groupId):
"""
- Cancel active jobs for the specified group. See
L{SparkContext.setJobGroup}
+ Cancel active jobs for the specified group. See
:meth:`SparkContext.setJobGroup`.
for more information.
"""
self._jsc.sc().cancelJobGroup(groupId)
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
index 797573f..c08db41 100644
--- a/python/pyspark/files.py
+++ b/python/pyspark/files.py
@@ -24,8 +24,7 @@ __all__ = ['SparkFiles']
class SparkFiles(object):
"""
- Resolves paths to files added through
- L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}.
+ Resolves paths to files added through :meth:`SparkContext.addFile`.
SparkFiles contains only classmethods; users should not create SparkFiles
instances.
@@ -41,7 +40,7 @@ class SparkFiles(object):
@classmethod
def get(cls, filename):
"""
- Get the absolute path of a file added through
C{SparkContext.addFile()}.
+ Get the absolute path of a file added through
:meth:`SparkContext.addFile`.
"""
path = os.path.join(SparkFiles.getRootDirectory(), filename)
return os.path.abspath(path)
@@ -50,7 +49,7 @@ class SparkFiles(object):
def getRootDirectory(cls):
"""
Get the root directory that contains files added through
- C{SparkContext.addFile()}.
+ :meth:`SparkContext.addFile`.
"""
if cls._is_running_on_worker:
return cls._root_directory
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 9827a2a..78d0269 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2560,7 +2560,7 @@ class IndexToString(JavaTransformer, HasInputCol,
HasOutputCol, JavaMLReadable,
corresponding string values.
The index-string mapping is either from the ML attributes of the input
column,
or from user-supplied labels (which take precedence over ML attributes).
- See L{StringIndexer} for converting strings into indices.
+ See :class:`StringIndexer` for converting strings into indices.
.. versionadded:: 1.6.0
"""
diff --git a/python/pyspark/ml/linalg/__init__.py
b/python/pyspark/ml/linalg/__init__.py
index f6ddc09..a79d5e5 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -17,9 +17,9 @@
"""
MLlib utilities for linear algebra. For dense vectors, MLlib
-uses the NumPy C{array} type, so you can simply pass NumPy arrays
-around. For sparse vectors, users can construct a L{SparseVector}
-object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+uses the NumPy `array` type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a :class:`SparseVector`
+object from MLlib or pass SciPy `scipy.sparse` column vectors if
SciPy is available in their environment.
"""
@@ -758,7 +758,7 @@ class Vectors(object):
.. note:: Dense vectors are simply represented as NumPy array objects,
so there is no need to covert them for use in MLlib. For sparse
vectors,
the factory methods in this class create an MLlib-compatible type, or
users
- can pass in SciPy's C{scipy.sparse} column vectors.
+ can pass in SciPy's `scipy.sparse` column vectors.
"""
@staticmethod
diff --git a/python/pyspark/mllib/classification.py
b/python/pyspark/mllib/classification.py
index d2037be..c52da2a 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -659,11 +659,11 @@ class NaiveBayes(object):
Train a Naive Bayes model given an RDD of (label, features)
vectors.
- This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
+ This is the `Multinomial NB <http://tinyurl.com/lsdw6p>`_ which
can handle all kinds of discrete data. For example, by
converting documents into TF-IDF vectors, it can be used for
document classification. By making every vector a 0-1 vector,
- it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
+ it can also be used as `Bernoulli NB <http://tinyurl.com/p7c96j6>`_.
The input feature values must be nonnegative.
:param data:
diff --git a/python/pyspark/mllib/clustering.py
b/python/pyspark/mllib/clustering.py
index 58da434..3524fcf 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -130,9 +130,9 @@ class BisectingKMeans(object):
clusters, larger clusters get higher priority.
Based on
- U{http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf}
- Steinbach, Karypis, and Kumar, A comparison of document clustering
- techniques, KDD Workshop on Text Mining, 2000.
+ `Steinbach, Karypis, and Kumar, A comparison of document clustering
+ techniques, KDD Workshop on Text Mining, 2000
+ <http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf>`_.
.. versionadded:: 2.0.0
"""
diff --git a/python/pyspark/mllib/linalg/__init__.py
b/python/pyspark/mllib/linalg/__init__.py
index df411d7..cd09621 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -17,9 +17,9 @@
"""
MLlib utilities for linear algebra. For dense vectors, MLlib
-uses the NumPy C{array} type, so you can simply pass NumPy arrays
-around. For sparse vectors, users can construct a L{SparseVector}
-object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+uses the NumPy `array` type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a :class:`SparseVector`
+object from MLlib or pass SciPy `scipy.sparse` column vectors if
SciPy is available in their environment.
"""
@@ -847,7 +847,7 @@ class Vectors(object):
.. note:: Dense vectors are simply represented as NumPy array objects,
so there is no need to covert them for use in MLlib. For sparse
vectors,
the factory methods in this class create an MLlib-compatible type, or
users
- can pass in SciPy's C{scipy.sparse} column vectors.
+ can pass in SciPy's `scipy.sparse` column vectors.
"""
@staticmethod
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index a8833cb..6106c58 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -54,8 +54,7 @@ class RandomRDDs(object):
To transform the distribution in the generated RDD from U(0.0, 1.0)
to U(a, b), use
- C{RandomRDDs.uniformRDD(sc, n, p, seed)\
- .map(lambda v: a + (b - a) * v)}
+ ``RandomRDDs.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) *
v)``
:param sc: SparkContext used to create the RDD.
:param size: Size of the RDD.
@@ -85,8 +84,7 @@ class RandomRDDs(object):
To transform the distribution in the generated RDD from standard normal
to some other normal N(mean, sigma^2), use
- C{RandomRDDs.normal(sc, n, p, seed)\
- .map(lambda v: mean + sigma * v)}
+ ``RandomRDDs.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)``
:param sc: SparkContext used to create the RDD.
:param size: Size of the RDD.
diff --git a/python/pyspark/mllib/stat/_statistics.py
b/python/pyspark/mllib/stat/_statistics.py
index 6e89bfd..d49f741 100644
--- a/python/pyspark/mllib/stat/_statistics.py
+++ b/python/pyspark/mllib/stat/_statistics.py
@@ -98,10 +98,10 @@ class Statistics(object):
"""
Compute the correlation (matrix) for the input RDD(s) using the
specified method.
- Methods currently supported: I{pearson (default), spearman}.
+ Methods currently supported: `pearson (default), spearman`.
If a single RDD of Vectors is passed in, a correlation matrix
- comparing the columns in the input RDD is returned. Use C{method=}
+ comparing the columns in the input RDD is returned. Use `method`
to specify the method to be used for single RDD inout.
If two RDDs of floats are passed in, a single float is returned.
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 0190bf3..1a0ce42 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -95,7 +95,7 @@ class MLUtils(object):
which leads to inconsistent feature
dimensions.
:param minPartitions: min number of partitions
- @return: labeled data stored as an RDD of LabeledPoint
+ :return: labeled data stored as an RDD of LabeledPoint
>>> from tempfile import NamedTemporaryFile
>>> from pyspark.mllib.util import MLUtils
@@ -156,7 +156,7 @@ class MLUtils(object):
:param path: file or directory path in any Hadoop-supported file
system URI
:param minPartitions: min number of partitions
- @return: labeled data stored as an RDD of LabeledPoint
+ :return: labeled data stored as an RDD of LabeledPoint
>>> from tempfile import NamedTemporaryFile
>>> from pyspark.mllib.util import MLUtils
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8edb7f3..8bcc67a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -286,13 +286,13 @@ class RDD(object):
@property
def context(self):
"""
- The L{SparkContext} that this RDD was created on.
+ The :class:`SparkContext` that this RDD was created on.
"""
return self.ctx
def cache(self):
"""
- Persist this RDD with the default storage level (C{MEMORY_ONLY}).
+ Persist this RDD with the default storage level (`MEMORY_ONLY`).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY)
@@ -303,7 +303,7 @@ class RDD(object):
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
- If no storage level is specified defaults to (C{MEMORY_ONLY}).
+ If no storage level is specified defaults to (`MEMORY_ONLY`).
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
@@ -330,7 +330,7 @@ class RDD(object):
def checkpoint(self):
"""
Mark this RDD for checkpointing. It will be saved to a file inside the
- checkpoint directory set with L{SparkContext.setCheckpointDir()} and
+ checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
all references to its parent RDDs will be removed. This function must
be called before any job has been executed on this RDD. It is strongly
recommended that this RDD is persisted in memory, otherwise saving it
@@ -360,9 +360,9 @@ class RDD(object):
This is NOT safe to use with dynamic allocation, which removes
executors along
with their cached blocks. If you must use both features, you are
advised to set
- L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+ `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
- The checkpoint directory set through
L{SparkContext.setCheckpointDir()} is not used.
+ The checkpoint directory set through
:meth:`SparkContext.setCheckpointDir` is not used.
"""
self._jrdd.rdd().localCheckpoint()
@@ -786,8 +786,8 @@ class RDD(object):
def cartesian(self, other):
"""
Return the Cartesian product of this RDD and another one, that is, the
- RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
- C{b} is in C{other}.
+ RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and
+ ``b`` is in `other`.
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
@@ -960,9 +960,9 @@ class RDD(object):
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
value."
- The function C{op(t1, t2)} is allowed to modify C{t1} and return it
+ The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it
as its result value to avoid object allocation; however, it should not
- modify C{t2}.
+ modify ``t2``.
This behaves somewhat differently from fold operations implemented
for non-distributed collections in functional languages like Scala.
@@ -995,9 +995,9 @@ class RDD(object):
the partitions, using a given combine functions and a neutral "zero
value."
- The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
+ The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it
as its result value to avoid object allocation; however, it should not
- modify C{t2}.
+ modify ``t2``.
The first function (seqOp) can return a different result type, U, than
the type of this RDD. Thus, we need one operation for merging a T into
@@ -1128,7 +1128,7 @@ class RDD(object):
def stats(self):
"""
- Return a L{StatCounter} object that captures the mean, variance
+ Return a :class:`StatCounter` object that captures the mean, variance
and count of the RDD's elements in one operation.
"""
def redFunc(left_counter, right_counter):
@@ -1467,10 +1467,10 @@ class RDD(object):
def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None,
valueConverter=None):
"""
- Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any
Hadoop file
+ Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to
any Hadoop file
system, using the new Hadoop OutputFormat API (mapreduce package).
Keys/values are
converted for output using either user specified converters or, by
default,
- L{org.apache.spark.api.python.JavaToWritableConverter}.
+ "org.apache.spark.api.python.JavaToWritableConverter".
:param conf: Hadoop job configuration, passed in as a dict
:param keyConverter: (None by default)
@@ -1484,11 +1484,11 @@ class RDD(object):
def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None,
valueClass=None,
keyConverter=None, valueConverter=None,
conf=None):
"""
- Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any
Hadoop file
+ Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to
any Hadoop file
system, using the new Hadoop OutputFormat API (mapreduce package). Key
and value types
will be inferred if not specified. Keys and values are converted for
output using either
- user specified converters or
L{org.apache.spark.api.python.JavaToWritableConverter}. The
- C{conf} is applied on top of the base Hadoop conf associated with the
SparkContext
+ user specified converters or
"org.apache.spark.api.python.JavaToWritableConverter". The
+ `conf` is applied on top of the base Hadoop conf associated with the
SparkContext
of this RDD to create a merged Hadoop MapReduce job configuration for
saving the data.
:param path: path to Hadoop file
@@ -1511,10 +1511,10 @@ class RDD(object):
def saveAsHadoopDataset(self, conf, keyConverter=None,
valueConverter=None):
"""
- Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any
Hadoop file
+ Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to
any Hadoop file
system, using the old Hadoop OutputFormat API (mapred package).
Keys/values are
converted for output using either user specified converters or, by
default,
- L{org.apache.spark.api.python.JavaToWritableConverter}.
+ "org.apache.spark.api.python.JavaToWritableConverter".
:param conf: Hadoop job configuration, passed in as a dict
:param keyConverter: (None by default)
@@ -1529,11 +1529,11 @@ class RDD(object):
keyConverter=None, valueConverter=None, conf=None,
compressionCodecClass=None):
"""
- Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any
Hadoop file
+ Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to
any Hadoop file
system, using the old Hadoop OutputFormat API (mapred package). Key
and value types
will be inferred if not specified. Keys and values are converted for
output using either
- user specified converters or
L{org.apache.spark.api.python.JavaToWritableConverter}. The
- C{conf} is applied on top of the base Hadoop conf associated with the
SparkContext
+ user specified converters or
"org.apache.spark.api.python.JavaToWritableConverter". The
+ `conf` is applied on top of the base Hadoop conf associated with the
SparkContext
of this RDD to create a merged Hadoop MapReduce job configuration for
saving the data.
:param path: path to Hadoop file
@@ -1558,8 +1558,8 @@ class RDD(object):
def saveAsSequenceFile(self, path, compressionCodecClass=None):
"""
- Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any
Hadoop file
- system, using the L{org.apache.hadoop.io.Writable} types that we
convert from the
+ Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to
any Hadoop file
+ system, using the "org.apache.hadoop.io.Writable" types that we
convert from the
RDD's key and value types. The mechanism is as follows:
1. Pyrolite is used to convert pickled Python RDD into RDD of Java
objects.
@@ -1575,7 +1575,7 @@ class RDD(object):
def saveAsPickleFile(self, path, batchSize=10):
"""
Save this RDD as a SequenceFile of serialized objects. The serializer
- used is L{pyspark.serializers.PickleSerializer}, default batch size
+ used is :class:`pyspark.serializers.PickleSerializer`, default batch
size
is 10.
>>> tmpFile = NamedTemporaryFile(delete=True)
@@ -1595,8 +1595,8 @@ class RDD(object):
"""
Save this RDD as a text file, using string representations of elements.
- @param path: path to text file
- @param compressionCodecClass: (None by default) string i.e.
+ :param path: path to text file
+ :param compressionCodecClass: (None by default) string i.e.
"org.apache.hadoop.io.compress.GzipCodec"
>>> tempFile = NamedTemporaryFile(delete=True)
@@ -1685,8 +1685,8 @@ class RDD(object):
This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
- Output will be partitioned with C{numPartitions} partitions, or
- the default parallelism level if C{numPartitions} is not specified.
+ Output will be partitioned with `numPartitions` partitions, or
+ the default parallelism level if `numPartitions` is not specified.
Default partitioner is hash-partition.
>>> from operator import add
@@ -1737,10 +1737,10 @@ class RDD(object):
def join(self, other, numPartitions=None):
"""
Return an RDD containing all pairs of elements with matching keys in
- C{self} and C{other}.
+ `self` and `other`.
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
- (k, v1) is in C{self} and (k, v2) is in C{other}.
+ (k, v1) is in `self` and (k, v2) is in `other`.
Performs a hash join across the cluster.
@@ -1753,11 +1753,11 @@ class RDD(object):
def leftOuterJoin(self, other, numPartitions=None):
"""
- Perform a left outer join of C{self} and C{other}.
+ Perform a left outer join of `self` and `other`.
- For each element (k, v) in C{self}, the resulting RDD will either
- contain all pairs (k, (v, w)) for w in C{other}, or the pair
- (k, (v, None)) if no elements in C{other} have key k.
+ For each element (k, v) in `self`, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in `other`, or the pair
+ (k, (v, None)) if no elements in `other` have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1770,11 +1770,11 @@ class RDD(object):
def rightOuterJoin(self, other, numPartitions=None):
"""
- Perform a right outer join of C{self} and C{other}.
+ Perform a right outer join of `self` and `other`.
- For each element (k, w) in C{other}, the resulting RDD will either
+ For each element (k, w) in `other`, the resulting RDD will either
contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
- if no elements in C{self} have key k.
+ if no elements in `self` have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1787,15 +1787,15 @@ class RDD(object):
def fullOuterJoin(self, other, numPartitions=None):
"""
- Perform a right outer join of C{self} and C{other}.
+ Perform a right outer join of `self` and `other`.
- For each element (k, v) in C{self}, the resulting RDD will either
- contain all pairs (k, (v, w)) for w in C{other}, or the pair
- (k, (v, None)) if no elements in C{other} have key k.
+ For each element (k, v) in `self`, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in `other`, or the pair
+ (k, (v, None)) if no elements in `other` have key k.
- Similarly, for each element (k, w) in C{other}, the resulting RDD will
- either contain all pairs (k, (v, w)) for v in C{self}, or the pair
- (k, (None, w)) if no elements in C{self} have key k.
+ Similarly, for each element (k, w) in `other`, the resulting RDD will
+ either contain all pairs (k, (v, w)) for v in `self`, or the pair
+ (k, (None, w)) if no elements in `self` have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1891,11 +1891,11 @@ class RDD(object):
Users provide three functions:
- - C{createCombiner}, which turns a V into a C (e.g., creates
+ - `createCombiner`, which turns a V into a C (e.g., creates
a one-element list)
- - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
+ - `mergeValue`, to merge a V into a C (e.g., adds it to the end of
a list)
- - C{mergeCombiners}, to combine two C's into a single one (e.g.,
merges
+ - `mergeCombiners`, to combine two C's into a single one (e.g.,
merges
the lists)
To avoid memory allocation, both mergeValue and mergeCombiners are
allowed to
@@ -2072,9 +2072,9 @@ class RDD(object):
# TODO: add variant with custom parittioner
def cogroup(self, other, numPartitions=None):
"""
- For each key k in C{self} or C{other}, return a resulting RDD that
- contains a tuple with the list of values for that key in C{self} as
- well as C{other}.
+ For each key k in `self` or `other`, return a resulting RDD that
+ contains a tuple with the list of values for that key in `self` as
+ well as `other`.
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
@@ -2106,8 +2106,8 @@ class RDD(object):
def subtractByKey(self, other, numPartitions=None):
"""
- Return each (key, value) pair in C{self} that has no pair with matching
- key in C{other}.
+ Return each (key, value) pair in `self` that has no pair with matching
+ key in `other`.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
@@ -2121,7 +2121,7 @@ class RDD(object):
def subtract(self, other, numPartitions=None):
"""
- Return each value in C{self} that is not contained in C{other}.
+ Return each value in `self` that is not contained in `other`.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
@@ -2134,7 +2134,7 @@ class RDD(object):
def keyBy(self, f):
"""
- Creates tuples of the elements in this RDD by applying C{f}.
+ Creates tuples of the elements in this RDD by applying `f`.
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
@@ -2260,7 +2260,7 @@ class RDD(object):
Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
n is the number of partitions. So there may exist gaps, but this
method won't trigger a spark job, which is different from
- L{zipWithIndex}
+ :meth:`zipWithIndex`.
>>> sc.parallelize(["a", "b", "c", "d", "e"],
3).zipWithUniqueId().collect()
[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ddca2a7..00f6081 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -19,12 +19,12 @@
PySpark supports custom serializers for transferring data; this can improve
performance.
-By default, PySpark uses L{PickleSerializer} to serialize objects using
Python's
-C{cPickle} serializer, which can serialize nearly any Python object.
-Other serializers, like L{MarshalSerializer}, support fewer datatypes but can
be
+By default, PySpark uses :class:`PickleSerializer` to serialize objects using
Python's
+`cPickle` serializer, which can serialize nearly any Python object.
+Other serializers, like :class:`MarshalSerializer`, support fewer datatypes
but can be
faster.
-The serializer is chosen when creating L{SparkContext}:
+The serializer is chosen when creating :class:`SparkContext`:
>>> from pyspark.context import SparkContext
>>> from pyspark.serializers import MarshalSerializer
@@ -34,7 +34,7 @@ The serializer is chosen when creating L{SparkContext}:
>>> sc.stop()
PySpark serializes objects in batches; by default, the batch size is chosen
based
-on the size of objects and is also configurable by SparkContext's C{batchSize}
+on the size of objects and is also configurable by SparkContext's `batchSize`
parameter:
>>> sc = SparkContext('local', 'test', batchSize=2)
@@ -129,7 +129,7 @@ class FramedSerializer(Serializer):
"""
Serializer that writes objects as a stream of (length, data) pairs,
- where C{length} is a 32-bit integer and data is C{length} bytes.
+ where `length` is a 32-bit integer and data is `length` bytes.
"""
def __init__(self):
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index e666973..87d4b81 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -408,7 +408,7 @@ class DataFrame(object):
"""Returns a checkpointed version of this Dataset. Checkpointing can
be used to truncate the
logical plan of this DataFrame, which is especially useful in
iterative algorithms where the
plan may grow exponentially. It will be saved to files inside the
checkpoint
- directory set with L{SparkContext.setCheckpointDir()}.
+ directory set with :meth:`SparkContext.setCheckpointDir`.
:param eager: Whether to checkpoint this DataFrame immediately
@@ -581,9 +581,9 @@ class DataFrame(object):
@since(1.3)
def cache(self):
- """Persists the :class:`DataFrame` with the default storage level
(C{MEMORY_AND_DISK}).
+ """Persists the :class:`DataFrame` with the default storage level
(`MEMORY_AND_DISK`).
- .. note:: The default storage level has changed to C{MEMORY_AND_DISK}
to match Scala in 2.0.
+ .. note:: The default storage level has changed to `MEMORY_AND_DISK`
to match Scala in 2.0.
"""
self.is_cached = True
self._jdf.cache()
@@ -594,9 +594,9 @@ class DataFrame(object):
"""Sets the storage level to persist the contents of the
:class:`DataFrame` across
operations after the first time it is computed. This can only be used
to assign
a new storage level if the :class:`DataFrame` does not have a storage
level set yet.
- If no storage level is specified defaults to (C{MEMORY_AND_DISK}).
+ If no storage level is specified defaults to (`MEMORY_AND_DISK`).
- .. note:: The default storage level has changed to C{MEMORY_AND_DISK}
to match Scala in 2.0.
+ .. note:: The default storage level has changed to `MEMORY_AND_DISK`
to match Scala in 2.0.
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index f9b12f1..da84fc1 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1405,7 +1405,7 @@ def _create_row(fields, values):
class Row(tuple):
"""
- A row in L{DataFrame}.
+ A row in :class:`DataFrame`.
The fields in it can be accessed:
* like attributes (``row.key``)
diff --git a/python/pyspark/streaming/context.py
b/python/pyspark/streaming/context.py
index 6fbe26b6..769121c 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -33,7 +33,7 @@ class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
represents the connection to a Spark cluster, and can be used to create
- L{DStream} various input sources. It can be from an existing
L{SparkContext}.
+ :class:`DStream` various input sources. It can be from an existing
:class:`SparkContext`.
After creating and transforming DStreams, the streaming computation can
be started and stopped using `context.start()` and `context.stop()`,
respectively. `context.awaitTermination()` allows the current thread
@@ -48,8 +48,8 @@ class StreamingContext(object):
"""
Create a new StreamingContext.
- @param sparkContext: L{SparkContext} object.
- @param batchDuration: the time interval (in seconds) at which streaming
+ :param sparkContext: :class:`SparkContext` object.
+ :param batchDuration: the time interval (in seconds) at which streaming
data will be divided into batches
"""
@@ -92,8 +92,8 @@ class StreamingContext(object):
recreated from the checkpoint data. If the data does not exist, then
the provided setupFunc
will be used to create a new context.
- @param checkpointPath: Checkpoint directory used in an earlier
streaming program
- @param setupFunc: Function to create a new context and setup
DStreams
+ :param checkpointPath: Checkpoint directory used in an earlier
streaming program
+ :param setupFunc: Function to create a new context and setup
DStreams
"""
cls._ensure_initialized()
gw = SparkContext._gateway
@@ -149,10 +149,10 @@ class StreamingContext(object):
valid checkpoint data, then setupFunc will be called to create a new
context and setup
DStreams.
- @param checkpointPath: Checkpoint directory used in an earlier
streaming program. Can be
+ :param checkpointPath: Checkpoint directory used in an earlier
streaming program. Can be
None if the intention is to always create a new
context when there
is no active context.
- @param setupFunc: Function to create a new JavaStreamingContext
and setup DStreams
+ :param setupFunc: Function to create a new JavaStreamingContext
and setup DStreams
"""
if setupFunc is None:
@@ -183,7 +183,7 @@ class StreamingContext(object):
"""
Wait for the execution to stop.
- @param timeout: time to wait in seconds
+ :param timeout: time to wait in seconds
"""
if timeout is None:
self._jssc.awaitTermination()
@@ -196,7 +196,7 @@ class StreamingContext(object):
throw the reported error during the execution; or `false` if the
waiting time elapsed before returning from the method.
- @param timeout: time to wait in seconds
+ :param timeout: time to wait in seconds
"""
return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
@@ -205,8 +205,8 @@ class StreamingContext(object):
Stop the execution of the streams, with option of ensuring all
received data has been processed.
- @param stopSparkContext: Stop the associated SparkContext or not
- @param stopGracefully: Stop gracefully by waiting for the processing
+ :param stopSparkContext: Stop the associated SparkContext or not
+ :param stopGracefully: Stop gracefully by waiting for the processing
of all received data to be completed
"""
self._jssc.stop(stopSparkContext, stopGraceFully)
@@ -223,7 +223,7 @@ class StreamingContext(object):
the RDDs (if the developer wishes to query old data outside the
DStream computation).
- @param duration: Minimum duration (in seconds) that each DStream
+ :param duration: Minimum duration (in seconds) that each DStream
should remember its RDDs
"""
self._jssc.remember(self._jduration(duration))
@@ -233,7 +233,7 @@ class StreamingContext(object):
Sets the context to periodically checkpoint the DStream operations for
master
fault-tolerance. The graph will be checkpointed every batch interval.
- @param directory: HDFS-compatible directory where the checkpoint data
+ :param directory: HDFS-compatible directory where the checkpoint data
will be reliably stored
"""
self._jssc.checkpoint(directory)
@@ -244,9 +244,9 @@ class StreamingContext(object):
a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n``
delimited
lines.
- @param hostname: Hostname to connect to for receiving data
- @param port: Port to connect to for receiving data
- @param storageLevel: Storage level to use for storing the received
objects
+ :param hostname: Hostname to connect to for receiving data
+ :param port: Port to connect to for receiving data
+ :param storageLevel: Storage level to use for storing the received
objects
"""
jlevel = self._sc._getJavaStorageLevel(storageLevel)
return DStream(self._jssc.socketTextStream(hostname, port, jlevel),
self,
@@ -270,8 +270,8 @@ class StreamingContext(object):
them from another location within the same file system.
File names starting with . are ignored.
- @param directory: Directory to load data from
- @param recordLength: Length of each record in bytes
+ :param directory: Directory to load data from
+ :param recordLength: Length of each record in bytes
"""
return DStream(self._jssc.binaryRecordsStream(directory,
recordLength), self,
NoOpSerializer())
@@ -290,9 +290,9 @@ class StreamingContext(object):
.. note:: Changes to the queue after the stream is created will not be
recognized.
- @param rdds: Queue of RDDs
- @param oneAtATime: pick one rdd each time or pick all of them once.
- @param default: The default rdd if no more in rdds
+ :param rdds: Queue of RDDs
+ :param oneAtATime: pick one rdd each time or pick all of them once.
+ :param default: The default rdd if no more in rdds
"""
if default and not isinstance(default, RDD):
default = self._sc.parallelize(default)
diff --git a/python/pyspark/streaming/dstream.py
b/python/pyspark/streaming/dstream.py
index c253e5c..60562a6 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -41,11 +41,11 @@ class DStream(object):
"""
A Discretized Stream (DStream), the basic abstraction in Spark Streaming,
is a continuous sequence of RDDs (of the same type) representing a
- continuous stream of data (see L{RDD} in the Spark core documentation
+ continuous stream of data (see :class:`RDD` in the Spark core documentation
for more details on RDDs).
DStreams can either be created from live data (such as, data from TCP
- sockets, etc.) using a L{StreamingContext} or it can be
+ sockets, etc.) using a :class:`StreamingContext` or it can be
generated by transforming existing DStreams using operations such as
`map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
program is running, each DStream periodically generates a RDD, either
@@ -167,7 +167,7 @@ class DStream(object):
"""
Print the first num elements of each RDD generated in this DStream.
- @param num: the number of elements from the first will be printed.
+ :param num: the number of elements from the first will be printed.
"""
def takeAndPrint(time, rdd):
taken = rdd.take(num + 1)
@@ -210,7 +210,7 @@ class DStream(object):
def cache(self):
"""
Persist the RDDs of this DStream with the default storage level
- (C{MEMORY_ONLY}).
+ (`MEMORY_ONLY`).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY)
@@ -229,7 +229,7 @@ class DStream(object):
"""
Enable periodic checkpointing of RDDs of this DStream
- @param interval: time in seconds, after each period of that, generated
+ :param interval: time in seconds, after each period of that, generated
RDD will be checkpointed
"""
self.is_checkpointed = True
@@ -333,7 +333,7 @@ class DStream(object):
"""
Return a new DStream by unifying data of another DStream with this
DStream.
- @param other: Another DStream having the same interval (i.e.,
slideDuration)
+ :param other: Another DStream having the same interval (i.e.,
slideDuration)
as this DStream.
"""
if self._slideDuration != other._slideDuration:
@@ -429,9 +429,9 @@ class DStream(object):
Return a new DStream in which each RDD contains all the elements in
seen in a
sliding window of time over this DStream.
- @param windowDuration: width of the window; must be a multiple of this
DStream's
+ :param windowDuration: width of the window; must be a multiple of this
DStream's
batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
+ :param slideDuration: sliding interval of the window (i.e., the
interval after which
the new DStream will generate RDDs); must be a
multiple of this
DStream's batching interval
"""
@@ -455,13 +455,13 @@ class DStream(object):
2. "inverse reduce" the old values that left the window (e.g.,
subtracting old counts)
This is more efficient than `invReduceFunc` is None.
- @param reduceFunc: associative and commutative reduce function
- @param invReduceFunc: inverse reduce function of `reduceFunc`; such
that for all y,
+ :param reduceFunc: associative and commutative reduce function
+ :param invReduceFunc: inverse reduce function of `reduceFunc`; such
that for all y,
and invertible x:
`invReduceFunc(reduceFunc(x, y), x) = y`
- @param windowDuration: width of the window; must be a multiple of this
DStream's
+ :param windowDuration: width of the window; must be a multiple of this
DStream's
batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
+ :param slideDuration: sliding interval of the window (i.e., the
interval after which
the new DStream will generate RDDs); must be a
multiple of this
DStream's batching interval
"""
@@ -487,12 +487,12 @@ class DStream(object):
Return a new DStream in which each RDD contains the count of distinct
elements in
RDDs in a sliding window over this DStream.
- @param windowDuration: width of the window; must be a multiple of this
DStream's
+ :param windowDuration: width of the window; must be a multiple of this
DStream's
batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
+ :param slideDuration: sliding interval of the window (i.e., the
interval after which
the new DStream will generate RDDs); must be a
multiple of this
DStream's batching interval
- @param numPartitions: number of partitions of each RDD in the new
DStream.
+ :param numPartitions: number of partitions of each RDD in the new
DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
@@ -504,12 +504,12 @@ class DStream(object):
Return a new DStream by applying `groupByKey` over a sliding window.
Similar to `DStream.groupByKey()`, but applies it over a sliding
window.
- @param windowDuration: width of the window; must be a multiple of this
DStream's
+ :param windowDuration: width of the window; must be a multiple of this
DStream's
batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
+ :param slideDuration: sliding interval of the window (i.e., the
interval after which
the new DStream will generate RDDs); must be a
multiple of this
DStream's batching interval
- @param numPartitions: Number of partitions of each RDD in the new
DStream.
+ :param numPartitions: Number of partitions of each RDD in the new
DStream.
"""
ls = self.mapValues(lambda x: [x])
grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a,
lambda a, b: a[len(b):],
@@ -528,15 +528,15 @@ class DStream(object):
`invFunc` can be None, then it will reduce all the RDDs in window,
could be slower
than having `invFunc`.
- @param func: associative and commutative reduce function
- @param invFunc: inverse function of `reduceFunc`
- @param windowDuration: width of the window; must be a multiple of this
DStream's
+ :param func: associative and commutative reduce function
+ :param invFunc: inverse function of `reduceFunc`
+ :param windowDuration: width of the window; must be a multiple of this
DStream's
batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
+ :param slideDuration: sliding interval of the window (i.e., the
interval after which
the new DStream will generate RDDs); must be a
multiple of this
DStream's batching interval
- @param numPartitions: number of partitions of each RDD in the new
DStream.
- @param filterFunc: function to filter expired key-value pairs;
+ :param numPartitions: number of partitions of each RDD in the new
DStream.
+ :param filterFunc: function to filter expired key-value pairs;
only pairs that satisfy the function are retained
set this to null if you do not want to filter
"""
@@ -578,7 +578,7 @@ class DStream(object):
Return a new "state" DStream where the state for each key is updated
by applying
the given function on the previous state of the key and the new values
of the key.
- @param updateFunc: State update function. If this function returns
None, then
+ :param updateFunc: State update function. If this function returns
None, then
corresponding state key-value pair will be
eliminated.
"""
if numPartitions is None:
diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py
index dff5e18..6d28491 100644
--- a/python/pyspark/taskcontext.py
+++ b/python/pyspark/taskcontext.py
@@ -28,7 +28,7 @@ class TaskContext(object):
Contextual information about a task which can be read or mutated during
execution. To access the TaskContext for a running task, use:
- L{TaskContext.get()}.
+ :meth:`TaskContext.get`.
"""
_taskContext = None
diff --git a/python/pyspark/testing/streamingutils.py
b/python/pyspark/testing/streamingutils.py
index 4c27f8a..a6abc2e 100644
--- a/python/pyspark/testing/streamingutils.py
+++ b/python/pyspark/testing/streamingutils.py
@@ -137,9 +137,9 @@ class PySparkStreamingTestCase(unittest.TestCase):
def _test_func(self, input, func, expected, sort=False, input2=None):
"""
- @param input: dataset for the test. This should be list of lists.
- @param func: wrapped function. This function should return
PythonDStream object.
- @param expected: expected output for this testcase.
+ :param input: dataset for the test. This should be list of lists.
+ :param func: wrapped function. This function should return
PythonDStream object.
+ :param expected: expected output for this testcase.
"""
if not isinstance(input[0], RDD):
input = [self.sc.parallelize(d, 1) for d in input]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]