This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 660a9f845f9 [SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using 
TaskContext to stop iterator on task completion
660a9f845f9 is described below

commit 660a9f845f954b4bf2c3a7d51988b33ae94e3207
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Tue May 3 08:30:05 2022 +0900

    [SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop 
iterator on task completion
    
    This PR fixes the issue described in 
https://issues.apache.org/jira/browse/SPARK-39084 where calling 
`df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or 
executor failure.
    
    The issue was due to Python iterator not being synchronised with Java 
iterator so when the task is complete, the Python iterator continues to process 
data. We have introduced ContextAwareIterator as part of 
https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the 
places where this should be used.
    
    Fixes the JVM crash when checking isEmpty() on a dataset.
    
    No.
    
    I added a test case that reproduces the issue 100%. I confirmed that the 
test fails without the fix and passes with the fix.
    
    Closes #36425 from sadikovi/fix-pyspark-iter-2.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 9305cc744d27daa6a746d3eb30e7639c63329072)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/test_dataframe.py         | 81 ++++++++++++++++++++++
 .../sql/execution/python/EvaluatePython.scala      |  3 +-
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index e3977e81851..dfdbcb912f7 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -21,6 +21,7 @@ import shutil
 import tempfile
 import time
 import unittest
+import uuid
 
 from pyspark.sql import SparkSession, Row
 from pyspark.sql.types import StringType, IntegerType, DoubleType, StructType, 
StructField, \
@@ -837,6 +838,86 @@ class DataFrameTests(ReusedSQLTestCase):
         finally:
             shutil.rmtree(tpath)
 
+<<<<<<< HEAD
+=======
+    def test_df_show(self):
+        # SPARK-35408: ensure better diagnostics if incorrect parameters are 
passed
+        # to DataFrame.show
+
+        df = self.spark.createDataFrame([("foo",)])
+        df.show(5)
+        df.show(5, True)
+        df.show(5, 1, True)
+        df.show(n=5, truncate="1", vertical=False)
+        df.show(n=5, truncate=1.5, vertical=False)
+
+        with self.assertRaisesRegex(TypeError, "Parameter 'n'"):
+            df.show(True)
+        with self.assertRaisesRegex(TypeError, "Parameter 'vertical'"):
+            df.show(vertical="foo")
+        with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"):
+            df.show(truncate="foo")
+
+    def test_df_is_empty(self):
+        # SPARK-39084: Fix df.rdd.isEmpty() resulting in JVM crash.
+
+        # This particular example of DataFrame reproduces an issue in isEmpty 
call
+        # which could result in JVM crash.
+        data = []
+        for t in range(0, 10000):
+            id = str(uuid.uuid4())
+            if t == 0:
+                for i in range(0, 99):
+                    data.append((id,))
+            elif t < 10:
+                for i in range(0, 75):
+                    data.append((id,))
+            elif t < 100:
+                for i in range(0, 50):
+                    data.append((id,))
+            elif t < 1000:
+                for i in range(0, 25):
+                    data.append((id,))
+            else:
+                for i in range(0, 10):
+                    data.append((id,))
+
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        try:
+            df = self.spark.createDataFrame(data, ["col"])
+            df.coalesce(1).write.parquet(tmpPath)
+
+            res = self.spark.read.parquet(tmpPath).groupBy("col").count()
+            self.assertFalse(res.rdd.isEmpty())
+        finally:
+            shutil.rmtree(tmpPath)
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_pandas_api(self):
+        import pandas as pd
+        from pandas.testing import assert_frame_equal
+
+        sdf = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], 
["Col1", "Col2"])
+        psdf_from_sdf = sdf.pandas_api()
+        psdf_from_sdf_with_index = sdf.pandas_api(index_col="Col1")
+        pdf = pd.DataFrame({"Col1": ["a", "b", "c"], "Col2": [1, 2, 3]})
+        pdf_with_index = pdf.set_index("Col1")
+
+        assert_frame_equal(pdf, psdf_from_sdf.to_pandas())
+        assert_frame_equal(pdf_with_index, 
psdf_from_sdf_with_index.to_pandas())
+
+    # test for SPARK-36337
+    def test_create_nan_decimal_dataframe(self):
+        self.assertEqual(
+            self.spark.createDataFrame(data=[Decimal("NaN")], 
schema="decimal").collect(),
+            [Row(value=None)],
+        )
+
+>>>>>>> 9305cc744d2 ([SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using 
TaskContext to stop iterator on task completion)
 
 class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
     # These tests are separate because it uses 
'spark.sql.queryExecutionListeners' which is
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 7fe32636308..ca33f6951e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
 
+import org.apache.spark.{ContextAwareIterator, TaskContext}
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -300,7 +301,7 @@ object EvaluatePython {
   def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = {
     rdd.mapPartitions { iter =>
       registerPicklers()  // let it called in executor
-      new SerDeUtil.AutoBatchedPickler(iter)
+      new SerDeUtil.AutoBatchedPickler(new 
ContextAwareIterator(TaskContext.get, iter))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to