This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c022e0e1655f [SPARK-54519][DOCS] Reformat tutorial comments so it's
easier to read on the website
c022e0e1655f is described below
commit c022e0e1655f71ba4f0f888c261e0bfec630634c
Author: Tian Gao <[email protected]>
AuthorDate: Wed Nov 26 11:55:57 2025 +0900
[SPARK-54519][DOCS] Reformat tutorial comments so it's easier to read on
the website
### What changes were proposed in this pull request?
Reformatted two files in tutorial where they have multiple comment lines
that are > 85 characters.
### Why are the changes needed?
Our docs website basically render the code section as a fix-width display.
Having lines that are too long makes it very difficult to read.
<img width="1464" height="1014" alt="image"
src="https://github.com/user-attachments/assets/e85dc920-2b1c-4457-847b-ceedaaf00931"
/>
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`make html` works locally to build the docs.
### Was this patch authored or co-authored using generative AI tooling?
Surprisingly no - but maybe I should've used it.
Closes #53225 from gaogaotiantian/fix-docs-width.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../source/tutorial/sql/python_data_source.rst | 29 ++--
python/docs/source/tutorial/sql/python_udtf.rst | 174 +++++++++++----------
2 files changed, 114 insertions(+), 89 deletions(-)
diff --git a/python/docs/source/tutorial/sql/python_data_source.rst
b/python/docs/source/tutorial/sql/python_data_source.rst
index 78ffeda0db1c..297d45304d64 100644
--- a/python/docs/source/tutorial/sql/python_data_source.rst
+++ b/python/docs/source/tutorial/sql/python_data_source.rst
@@ -247,20 +247,23 @@ This is a dummy streaming data reader that generate 2
rows in every microbatch.
def partitions(self, start: dict, end: dict) -> list[InputPartition]:
"""
- Plans the partitioning of the current microbatch defined by start
and end offset,
- it needs to return a sequence of :class:`InputPartition` object.
+ Plans the partitioning of the current microbatch defined by start
+ and end offset, it needs to return a sequence of
+ :class:`InputPartition` object.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict) -> None:
"""
- This is invoked when the query has finished processing data before
end offset, this can be used to clean up resource.
+ This is invoked when the query has finished processing data before
end
+ offset, this can be used to clean up resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
- Takes a partition as an input and read an iterator of tuples from
the data source.
+ Takes a partition as an input and read an iterator of tuples from
the
+ data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
@@ -305,7 +308,8 @@ This is the same dummy streaming reader that generate 2
rows every batch impleme
def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
"""
- Takes start offset as an input, return an iterator of tuples and
the start offset of next read.
+ Takes start offset as an input, return an iterator of tuples and
+ the start offset of next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
@@ -313,7 +317,8 @@ This is the same dummy streaming reader that generate 2
rows every batch impleme
def readBetweenOffsets(self, start: dict, end: dict) ->
Iterator[Tuple]:
"""
- Takes start and end offset as input and read an iterator of data
deterministically.
+ Takes start and end offset as input and read an iterator of data
+ deterministically.
This is called whe query replay batches during restart or after
failure.
"""
start_idx = start["offset"]
@@ -322,7 +327,8 @@ This is the same dummy streaming reader that generate 2
rows every batch impleme
def commit(self, end: dict) -> None:
"""
- This is invoked when the query has finished processing data before
end offset, this can be used to clean up resource.
+ This is invoked when the query has finished processing data before
end
+ offset, this can be used to clean up resource.
"""
pass
@@ -363,8 +369,10 @@ This is a streaming data writer that write the metadata
information of each micr
def commit(self, messages: List[Optional[SimpleCommitMessage]],
batchId: int) -> None:
"""
- Receives a sequence of :class:`SimpleCommitMessage` when all write
tasks succeed and decides what to do with it.
- In this FakeStreamWriter, we write the metadata of the
microbatch(number of rows and partitions) into a json file inside commit().
+ Receives a sequence of :class:`SimpleCommitMessage` when all write
tasks
+ succeed and decides what to do with it.
+ In this FakeStreamWriter, we write the metadata of the microbatch
+ (number of rows and partitions) into a json file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m
in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
@@ -372,7 +380,8 @@ This is a streaming data writer that write the metadata
information of each micr
def abort(self, messages: List[Optional[SimpleCommitMessage]], batchId:
int) -> None:
"""
- Receives a sequence of :class:`SimpleCommitMessage` from successful
tasks when some tasks fail and decides what to do with it.
+ Receives a sequence of :class:`SimpleCommitMessage` from successful
tasks
+ when some tasks fail and decides what to do with it.
In this FakeStreamWriter, we write a failure message into a txt
file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
diff --git a/python/docs/source/tutorial/sql/python_udtf.rst
b/python/docs/source/tutorial/sql/python_udtf.rst
index a989848448c9..26142e42e363 100644
--- a/python/docs/source/tutorial/sql/python_udtf.rst
+++ b/python/docs/source/tutorial/sql/python_udtf.rst
@@ -44,96 +44,104 @@ To implement a Python UDTF, you first need to define a
class implementing the me
UDTF is instantiated on the executor side.
Any class fields assigned in this method will be available for
subsequent
- calls to the `eval` and `terminate` methods. This class instance
will remain
- alive until all rows in the current partition have been consumed
by the `eval`
- method.
+ calls to the `eval` and `terminate` methods. This class instance
will
+ remain alive until all rows in the current partition have been
consumed
+ by the `eval` method.
Notes
-----
- You cannot create or reference the Spark session within the
UDTF. Any
attempt to do so will result in a serialization error.
- - If the below `analyze` method is implemented, it is also
possible to define this
- method as: `__init__(self, analyze_result: AnalyzeResult)`. In
this case, the result
- of the `analyze` method is passed into all future instantiations
of this UDTF class.
- In this way, the UDTF may inspect the schema and metadata of the
output table as
- needed during execution of other methods in this class. Note
that it is possible to
- create a subclass of the `AnalyzeResult` class if desired for
purposes of passing
- custom information generated just once during UDTF analysis to
other method calls;
- this can be especially useful if this initialization is
expensive.
+ - If the below `analyze` method is implemented, it is also
possible to
+ define this method as: `__init__(self, analyze_result:
AnalyzeResult)`.
+ In this case, the result of the `analyze` method is passed into
all
+ future instantiations of this UDTF class. In this way, the UDTF
may
+ inspect the schema and metadata of the output table as needed
during
+ execution of other methods in this class. Note that it is
possible to
+ create a subclass of the `AnalyzeResult` class if desired for
purposes
+ of passing custom information generated just once during UDTF
analysis
+ to other method calls; this can be especially useful if this
+ initialization is expensive.
"""
...
@staticmethod
def analyze(self, *args: AnalyzeArgument) -> AnalyzeResult:
"""
- Static method to compute the output schema of a particular call to
this function in
- response to the arguments provided.
+ Static method to compute the output schema of a particular call to
this
+ function in response to the arguments provided.
- This method is optional and only needed if the registration of the
UDTF did not provide
- a static output schema to be use for all calls to the function. In
this context,
- `output schema` refers to the ordered list of the names and types
of the columns in the
- function's result table.
+ This method is optional and only needed if the registration of the
UDTF
+ did not provide a static output schema to be use for all calls to
the
+ function. In this context, `output schema` refers to the ordered
list
+ of the names and types of the columns in the function's result
table.
- This method accepts zero or more parameters mapping 1:1 with the
arguments provided to
- the particular UDTF call under consideration. Each parameter is an
instance of the
- `AnalyzeArgument` class.
+ This method accepts zero or more parameters mapping 1:1 with the
+ arguments provided to the particular UDTF call under consideration.
+ Each parameter is an instance of the `AnalyzeArgument` class.
`AnalyzeArgument` fields
------------------------
dataType: DataType
- Indicates the type of the provided input argument to this
particular UDTF call.
- For input table arguments, this is a StructType representing
the table's columns.
+ Indicates the type of the provided input argument to this
particular
+ UDTF call. For input table arguments, this is a StructType
+ representing the table's columns.
value: Optional[Any]
- The value of the provided input argument to this particular
UDTF call. This is
- `None` for table arguments, or for literal scalar arguments
that are not constant.
+ The value of the provided input argument to this particular
UDTF
+ call. This is `None` for table arguments, or for literal scalar
+ arguments that are not constant.
isTable: bool
- This is true if the provided input argument to this particular
UDTF call is a
- table argument.
+ This is true if the provided input argument to this particular
UDTF
+ call is a table argument.
isConstantExpression: bool
- This is true if the provided input argument to this particular
UDTF call is either a
- literal or other constant-foldable scalar expression.
-
- This method returns an instance of the `AnalyzeResult` class which
includes the result
- table's schema as a StructType. If the UDTF accepts an input table
argument, then the
- `AnalyzeResult` can also include a requested way to partition and
order the rows of
- the input table across several UDTF calls. See below for more
information about UDTF
- table arguments and how to call them in SQL queries, including the
WITH SINGLE
- PARTITION clause (corresponding to the `withSinglePartition` field
here), PARTITION BY
- clause (corresponding to the `partitionBy` field here), ORDER BY
clause (corresponding
- to the `orderBy` field here), and passing table subqueries as
arguments (corresponding
- to the `select` field here).
+ This is true if the provided input argument to this particular
UDTF
+ call is either a literal or other constant-foldable scalar
expression.
+
+ This method returns an instance of the `AnalyzeResult` class which
+ includes the result table's schema as a StructType. If the UDTF
accepts
+ an input table argument, then the `AnalyzeResult` can also include
a
+ requested way to partition and order the rows of the input table
across
+ several UDTF calls. See below for more information about UDTF table
+ arguments and how to call them in SQL queries, including the WITH
SINGLE
+ PARTITION clause (corresponding to the `withSinglePartition` field
here),
+ PARTITION BY clause (corresponding to the `partitionBy` field
here),
+ ORDER BY clause (corresponding to the `orderBy` field here), and
passing
+ table subqueries as arguments (corresponding to the `select` field
here).
`AnalyzeResult` fields
----------------------
schema: StructType
The schema of the result table.
withSinglePartition: bool = False
- If True, the query planner will arrange a repartitioning
operation from the previous
- execution stage such that all rows of the input table are
consumed by the `eval`
- method from exactly one instance of the UDTF class.
+ If True, the query planner will arrange a repartitioning
operation
+ from the previous execution stage such that all rows of the
input
+ table are consumed by the `eval` method from exactly one
instance
+ of the UDTF class.
partitionBy: Sequence[PartitioningColumn] =
field(default_factory=tuple)
- If non-empty, the query planner will arrange a repartitioning
such that all rows
- with each unique combination of values of the partitioning
expressions are consumed
- by a separate unique instance of the UDTF class.
+ If non-empty, the query planner will arrange a repartitioning
such
+ that all rows with each unique combination of values of the
+ partitioning expressions are consumed by a separate unique
instance
+ of the UDTF class.
orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
- If non-empty, this specifies the requested ordering of rows
within each partition.
+ If non-empty, this specifies the requested ordering of rows
within
+ each partition.
select: Sequence[SelectedColumn] = field(default_factory=tuple)
- If non-empty, this is a sequence of expressions that the UDTF
is specifying for
- Catalyst to evaluate against the columns in the input TABLE
argument. The UDTF then
- receives one input attribute for each name in the list, in the
order they are
- listed.
+ If non-empty, this is a sequence of expressions that the UDTF
is
+ specifying for Catalyst to evaluate against the columns in the
input
+ TABLE argument. The UDTF then receives one input attribute for
each
+ name in the list, in the order they are listed.
Notes
-----
- - It is possible for the `analyze` method to accept the exact
arguments expected,
- mapping 1:1 with the arguments provided to the UDTF call.
- - The `analyze` method can instead choose to accept positional
arguments if desired
- (using `*args`) or keyword arguments (using `**kwargs`).
+ - It is possible for the `analyze` method to accept the exact
arguments
+ expected, mapping 1:1 with the arguments provided to the UDTF
call.
+ - The `analyze` method can instead choose to accept positional
arguments
+ if desired (using `*args`) or keyword arguments (using
`**kwargs`).
Examples
--------
- This is an `analyze` implementation that returns one output column
for each word in the
- input string argument.
+ This is an `analyze` implementation that returns one output column
for
+ each word in the input string argument.
>>> @staticmethod
... def analyze(text: str) -> AnalyzeResult:
@@ -167,9 +175,9 @@ To implement a Python UDTF, you first need to define a
class implementing the me
... schema = schema.add(f"word_{index}")
... return AnalyzeResult(schema=schema)
- This is an `analyze` implementation that returns a constant output
schema, but add
- custom information in the result metadata to be consumed by future
__init__ method
- calls:
+ This is an `analyze` implementation that returns a constant output
+ schema, but add custom information in the result metadata to be
consumed
+ by future __init__ method calls:
>>> @staticmethod
... def analyze(text: str) -> AnalyzeResult:
@@ -187,15 +195,19 @@ To implement a Python UDTF, you first need to define a
class implementing the me
... word for word in words
... if word == 'a' or word == 'an' or word == 'the')))
- This is an `analyze` implementation that returns a constant output
schema, and also
- requests to select a subset of columns from the input table and
for the input table to
- be partitioned across several UDTF calls based on the values of
the `date` column.
- A SQL query may this UDTF passing a table argument like "SELECT *
FROM udtf(TABLE(t))".
- Then this `analyze` method specifies additional constraints on the
input table:
- (1) The input table must be partitioned across several UDTF calls
based on the values of
- the month value of each `date` column.
- (2) The rows within each partition will arrive ordered by the
`date` column.
- (3) The UDTF will only receive the `date` and `word` columns from
the input table.
+ This is an `analyze` implementation that returns a constant output
+ schema, and also requests to select a subset of columns from the
input
+ table and for the input table to be partitioned across several UDTF
+ calls based on the values of the `date` column. A SQL query may
this
+ UDTF passing a table argument like "SELECT * FROM udtf(TABLE(t))".
+ Then this `analyze` method specifies additional constraints on the
+ input table:
+ (1) The input table must be partitioned across several UDTF calls
+ based on the values of the month value of each `date` column.
+ (2) The rows within each partition will arrive ordered by the
`date`
+ column.
+ (3) The UDTF will only receive the `date` and `word` columns from
+ the input table.
>>> @staticmethod
... def analyze(*args) -> AnalyzeResult:
@@ -242,21 +254,25 @@ To implement a Python UDTF, you first need to define a
class implementing the me
Yields
------
tuple
- A tuple, list, or pyspark.sql.Row object representing a single
row in the UDTF
- result table. Yield as many times as needed to produce
multiple rows.
+ A tuple, list, or pyspark.sql.Row object representing a single
row
+ in the UDTF result table. Yield as many times as needed to
produce
+ multiple rows.
Notes
-----
- - It is also possible for UDTFs to accept the exact arguments
expected, along with
- their types.
- - UDTFs can instead accept keyword arguments during the function
call if needed.
- - The `eval` method can raise a `SkipRestOfInputTableException` to
indicate that the
- UDTF wants to skip consuming all remaining rows from the current
partition of the
- input table. This will cause the UDTF to proceed directly to the
`terminate` method.
- - The `eval` method can raise any other exception to indicate that
the UDTF should be
- aborted entirely. This will cause the UDTF to skip the
`terminate` method and proceed
- directly to the `cleanup` method, and then the exception will be
propagated to the
- query processor causing the invoking query to fail.
+ - It is also possible for UDTFs to accept the exact arguments
expected,
+ along with their types.
+ - UDTFs can instead accept keyword arguments during the function
call if
+ needed.
+ - The `eval` method can raise a `SkipRestOfInputTableException` to
+ indicate that the UDTF wants to skip consuming all remaining
rows from
+ the current partition of the input table. This will cause the
UDTF to
+ proceed directly to the `terminate` method.
+ - The `eval` method can raise any other exception to indicate that
the
+ UDTF should be aborted entirely. This will cause the UDTF to
skip the
+ `terminate` method and proceed directly to the `cleanup` method,
and
+ then the exception will be propagated to the query processor
causing
+ the invoking query to fail.
Examples
--------
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]