This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 3cb6a44a8d91 [SPARK-47734][PYTHON][TESTS] Fix flaky 
DataFrame.writeStream doctest by stopping streaming query
3cb6a44a8d91 is described below

commit 3cb6a44a8d9112fb53a28ccaedf8bbc648cdf92a
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Fri Apr 5 11:14:42 2024 +0900

    [SPARK-47734][PYTHON][TESTS] Fix flaky DataFrame.writeStream doctest by 
stopping streaming query
    
    ### What changes were proposed in this pull request?
    
    This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest.
    
    PR https://github.com/apache/spark/pull/45298 aimed to fix that test but 
misdiagnosed the root issue. The problem is not that concurrent tests were 
colliding on a temporary directory. Rather, the issue is specific to the 
`DataFrame.writeStream` test's logic: that test is starting a streaming query 
that writes files to the temporary directory, the exits the temp directory 
context manager without first stopping the streaming query. That creates a race 
condition where the context manager [...]
    
    ```
    File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in 
pyspark.sql.dataframe.DataFrame.writeStream
    Failed example:
        with tempfile.TemporaryDirectory() as d:
            # Create a table with Rate source.
            df.writeStream.toTable(
                "my_table", checkpointLocation=d)
    Exception raised:
        Traceback (most recent call last):
          File "/usr/lib/python3.11/doctest.py", line 1353, in __run
            exec(compile(example.source, filename, "single",
          File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 
1, in <module>
            with tempfile.TemporaryDirectory() as d:
          File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
            self.cleanup()
          File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
            self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
          File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
            _rmtree(name, onerror=onerror)
          File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
            onerror(os.rmdir, path, sys.exc_info())
          File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
            os.rmdir(path, dir_fd=dir_fd)
        OSError: [Errno 39] Directory not empty: 
'/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
    ```
    
    In this PR, I update the doctest to properly stop the streaming query.
    
    ### Why are the changes needed?
    
    Fix flaky test.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only. Small user-facing doc change, but one that is consistent 
with other doctest examples.
    
    ### How was this patch tested?
    
    Manually ran updated test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45885 from JoshRosen/fix-flaky-writestream-doctest.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 0107435cb39d68eccf8a6900c3c781665deca38b)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/dataframe.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 7c382ab1c5a5..97f60967da70 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -529,6 +529,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> import time
         >>> import tempfile
         >>> df = spark.readStream.format("rate").load()
         >>> type(df.writeStream)
@@ -536,9 +537,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         >>> with tempfile.TemporaryDirectory() as d:
         ...     # Create a table with Rate source.
-        ...     df.writeStream.toTable(
+        ...     query = df.writeStream.toTable(
         ...         "my_table", checkpointLocation=d)
-        <...streaming.query.StreamingQuery object at 0x...>
+        ...     time.sleep(3)
+        ...     query.stop()
         """
         return DataStreamWriter(self)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to