This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b7cfa56cb9 [SPARK-48497][PYTHON][DOCS] Add an example for Python 
data source writer in user guide
f0b7cfa56cb9 is described below

commit f0b7cfa56cb90ef70132d9656299956cbde00b53
Author: allisonwang-db <[email protected]>
AuthorDate: Tue Jun 18 08:46:10 2024 +0900

    [SPARK-48497][PYTHON][DOCS] Add an example for Python data source writer in 
user guide
    
    ### What changes were proposed in this pull request?
    
    This PR adds an example for creating a simple data source writer in the 
user guide.
    
    ### Why are the changes needed?
    
    To improve the PySpark documentation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Verified locally.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46833 from allisonwang-db/spark-48497-ds-write-doc.
    
    Authored-by: allisonwang-db <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../source/user_guide/sql/python_data_source.rst   | 66 ++++++++++++++++++++--
 1 file changed, 60 insertions(+), 6 deletions(-)

diff --git a/python/docs/source/user_guide/sql/python_data_source.rst 
b/python/docs/source/user_guide/sql/python_data_source.rst
index 2386303f0da8..cdbc70699311 100644
--- a/python/docs/source/user_guide/sql/python_data_source.rst
+++ b/python/docs/source/user_guide/sql/python_data_source.rst
@@ -73,6 +73,9 @@ Method that needs to be implemented for a capability:
         def reader(self, schema: StructType):
             return FakeDataSourceReader(schema, self.options)
 
+        def writer(self, schema: StructType, overwrite: bool):
+            return FakeDataSourceWriter(self.options)
+
         def streamReader(self, schema: StructType):
             return FakeStreamReader(schema, self.options)
 
@@ -83,8 +86,8 @@ Method that needs to be implemented for a capability:
         def streamWriter(self, schema: StructType, overwrite: bool):
             return FakeStreamWriter(self.options)
 
-Implementing Reader for Python Data Source
-------------------------------------------
+Implementing Batch Reader and Writer for Python Data Source
+-----------------------------------------------------------
 **Implement the Reader**
 
 Define the reader logic to generate synthetic data. Use the `faker` library to 
populate each field in the schema.
@@ -109,6 +112,43 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Writer**
+
+Create a fake data source writer that processes each partition of data, counts 
the rows, and either
+prints the total count of rows after a successful write or the number of 
failed tasks if the writing process fails.
+
+.. code-block:: python
+
+    from dataclasses import dataclass
+    from typing import Iterator, List
+
+    from pyspark.sql.types import Row
+    from pyspark.sql.datasource import DataSource, DataSourceWriter, 
WriterCommitMessage
+
+    @dataclass
+    class SimpleCommitMessage(WriterCommitMessage):
+        partition_id: int
+        count: int
+
+    class FakeDataSourceWriter(DataSourceWriter):
+
+        def write(self, rows: Iterator[Row]) -> SimpleCommitMessage:
+            from pyspark import TaskContext
+
+            context = TaskContext.get()
+            partition_id = context.partitionId()
+            cnt = sum(1 for _ in rows)
+            return SimpleCommitMessage(partition_id=partition_id, count=cnt)
+
+        def commit(self, messages: List[SimpleCommitMessage]) -> None:
+            total_count = sum(message.count for message in messages)
+            print(f"Total number of rows: {total_count}")
+
+        def abort(self, messages: List[SimpleCommitMessage]) -> None:
+            failed_count = sum(message is None for message in messages)
+            print(f"Number of failed tasks: {failed_count}")
+
+
 Implementing Streaming Reader and Writer for Python Data Source
 ---------------------------------------------------------------
 **Implement the Stream Reader**
@@ -267,7 +307,9 @@ After defining your data source, it must be registered 
before usage.
 
     spark.dataSource.register(FakeDataSource)
 
-Use the fake datasource with the default schema and options:
+**Read From a Python Data Source**
+
+Read from the fake datasource with the default schema and options:
 
 .. code-block:: python
 
@@ -281,7 +323,7 @@ Use the fake datasource with the default schema and options:
     # | Amy Martin|1988-10-28|  68076| Oregon|
     # +-----------+----------+-------+-------+
 
-Use the fake datasource with a custom schema:
+Read from the fake datasource with a custom schema:
 
 .. code-block:: python
 
@@ -295,7 +337,7 @@ Use the fake datasource with a custom schema:
     # |Mrs. Jacqueline Brown|Maynard Inc   |
     # +---------------------+--------------+
 
-Use the fake datasource with a different number of rows:
+Read from the fake datasource with a different number of rows:
 
 .. code-block:: python
 
@@ -311,6 +353,18 @@ Use the fake datasource with a different number of rows:
     # | Douglas James|2007-01-18|  46226|     Alabama|
     # +--------------+----------+-------+------------+
 
+**Write To a Python Data Source**
+
+To write data to a custom location, make sure that you specify the `mode()` 
clause. Supported modes are `append` and `overwrite`.
+
+.. code-block:: python
+
+    df = spark.range(0, 10, 1, 5)
+    df.write.format("fake").mode("append").save()
+
+    # You can check the Spark log (standard error) to see the output of the 
write operation.
+    # Total number of rows: 10
+
 **Use a Python Data Source in Streaming Query**
 
 Once we register the python data source, we can also use it in streaming 
queries as source of readStream() or sink of writeStream() by passing short 
name or full name to format().
@@ -338,4 +392,4 @@ We can also use the same data source in streaming reader 
and writer
 
 .. code-block:: python
 
-    query = 
spark.readStream.format("fake").load().writeStream().format("fake").start("/output_path")
+    query = 
spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to