This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit de1c77b9da0198f4e3ce099bea13fcef4a4a8c1f
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Nov 22 15:36:32 2024 +0900

    Revert "[SPARK-50291][PYTHON] Standardize verifySchema parameter of 
createDataFrame in Spark Classic"
    
    This reverts commit aea9e875fba49c5dac230c8c1ce5ee0c53fd4fde.
---
 python/pyspark/sql/pandas/conversion.py            | 37 ++++++--------------
 python/pyspark/sql/session.py                      | 35 ++++++++-----------
 .../pyspark/sql/tests/connect/test_parity_arrow.py |  4 ---
 python/pyspark/sql/tests/test_arrow.py             | 39 ----------------------
 python/pyspark/sql/tests/typing/test_session.yml   | 13 ++++----
 5 files changed, 29 insertions(+), 99 deletions(-)

diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index 0c612bf4eae3..172a4fc4b234 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -27,7 +27,6 @@ from typing import (
 )
 from warnings import warn
 
-from pyspark._globals import _NoValue, _NoValueType
 from pyspark.errors.exceptions.captured import unwrap_spark_exception
 from pyspark.loose_version import LooseVersion
 from pyspark.util import _load_from_socket
@@ -353,7 +352,7 @@ class SparkConversionMixin:
         self,
         data: "PandasDataFrameLike",
         schema: Union[StructType, str],
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> "DataFrame":
         ...
 
@@ -362,7 +361,7 @@ class SparkConversionMixin:
         self,
         data: "pa.Table",
         schema: Union[StructType, str],
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> "DataFrame":
         ...
 
@@ -371,7 +370,7 @@ class SparkConversionMixin:
         data: Union["PandasDataFrameLike", "pa.Table"],
         schema: Optional[Union[StructType, List[str]]] = None,
         samplingRatio: Optional[float] = None,
-        verifySchema: Union[_NoValueType, bool] = _NoValue,
+        verifySchema: bool = True,
     ) -> "DataFrame":
         from pyspark.sql import SparkSession
 
@@ -393,7 +392,7 @@ class SparkConversionMixin:
             if schema is None:
                 schema = data.schema.names
 
-            return self._create_from_arrow_table(data, schema, timezone, 
verifySchema)
+            return self._create_from_arrow_table(data, schema, timezone)
 
         # `data` is a PandasDataFrameLike object
         from pyspark.sql.pandas.utils import require_minimum_pandas_version
@@ -406,7 +405,7 @@ class SparkConversionMixin:
 
         if self._jconf.arrowPySparkEnabled() and len(data) > 0:
             try:
-                return self._create_from_pandas_with_arrow(data, schema, 
timezone, verifySchema)
+                return self._create_from_pandas_with_arrow(data, schema, 
timezone)
             except Exception as e:
                 if self._jconf.arrowPySparkFallbackEnabled():
                     msg = (
@@ -625,11 +624,7 @@ class SparkConversionMixin:
         return np.dtype(record_type_list) if has_rec_fix else None
 
     def _create_from_pandas_with_arrow(
-        self,
-        pdf: "PandasDataFrameLike",
-        schema: Union[StructType, List[str]],
-        timezone: str,
-        verifySchema: Union[_NoValueType, bool],
+        self, pdf: "PandasDataFrameLike", schema: Union[StructType, 
List[str]], timezone: str
     ) -> "DataFrame":
         """
         Create a DataFrame from a given pandas.DataFrame by slicing it into 
partitions, converting
@@ -662,10 +657,6 @@ class SparkConversionMixin:
         )
         import pyarrow as pa
 
-        if verifySchema is _NoValue:
-            # (With Arrow optimization) createDataFrame with `pandas.DataFrame`
-            verifySchema = self._jconf.arrowSafeTypeConversion()
-
         infer_pandas_dict_as_map = (
             
str(self.conf.get("spark.sql.execution.pandas.inferPandasDictAsMap")).lower() 
== "true"
         )
@@ -734,7 +725,8 @@ class SparkConversionMixin:
 
         jsparkSession = self._jsparkSession
 
-        ser = ArrowStreamPandasSerializer(timezone, verifySchema)
+        safecheck = self._jconf.arrowSafeTypeConversion()
+        ser = ArrowStreamPandasSerializer(timezone, safecheck)
 
         @no_type_check
         def reader_func(temp_filename):
@@ -753,11 +745,7 @@ class SparkConversionMixin:
         return df
 
     def _create_from_arrow_table(
-        self,
-        table: "pa.Table",
-        schema: Union[StructType, List[str]],
-        timezone: str,
-        verifySchema: Union[_NoValueType, bool],
+        self, table: "pa.Table", schema: Union[StructType, List[str]], 
timezone: str
     ) -> "DataFrame":
         """
         Create a DataFrame from a given pyarrow.Table by slicing it into 
partitions then
@@ -779,10 +767,6 @@ class SparkConversionMixin:
 
         require_minimum_pyarrow_version()
 
-        if verifySchema is _NoValue:
-            # createDataFrame with `pyarrow.Table`
-            verifySchema = False
-
         prefer_timestamp_ntz = is_timestamp_ntz_preferred()
 
         # Create the Spark schema from list of names passed in with Arrow types
@@ -802,8 +786,7 @@ class SparkConversionMixin:
             schema = from_arrow_schema(table.schema, 
prefer_timestamp_ntz=prefer_timestamp_ntz)
 
         table = _check_arrow_table_timestamps_localize(table, schema, True, 
timezone).cast(
-            to_arrow_schema(schema, 
error_on_duplicated_field_names_in_struct=True),
-            safe=verifySchema,
+            to_arrow_schema(schema, 
error_on_duplicated_field_names_in_struct=True)
         )
 
         # Chunk the Arrow Table into RecordBatches
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 7231d6c10b0b..e97b84456410 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -38,7 +38,6 @@ from typing import (
     TYPE_CHECKING,
 )
 
-from pyspark._globals import _NoValue, _NoValueType
 from pyspark.conf import SparkConf
 from pyspark.util import is_remote_only
 from pyspark.sql.conf import RuntimeConfig
@@ -1272,7 +1271,7 @@ class SparkSession(SparkConversionMixin):
         data: Iterable["RowLike"],
         schema: Union[StructType, str],
         *,
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> DataFrame:
         ...
 
@@ -1282,7 +1281,7 @@ class SparkSession(SparkConversionMixin):
         data: "RDD[RowLike]",
         schema: Union[StructType, str],
         *,
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> DataFrame:
         ...
 
@@ -1291,7 +1290,7 @@ class SparkSession(SparkConversionMixin):
         self,
         data: "RDD[AtomicValue]",
         schema: Union[AtomicType, str],
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> DataFrame:
         ...
 
@@ -1300,7 +1299,7 @@ class SparkSession(SparkConversionMixin):
         self,
         data: Iterable["AtomicValue"],
         schema: Union[AtomicType, str],
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> DataFrame:
         ...
 
@@ -1319,7 +1318,7 @@ class SparkSession(SparkConversionMixin):
         self,
         data: "PandasDataFrameLike",
         schema: Union[StructType, str],
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> DataFrame:
         ...
 
@@ -1328,7 +1327,7 @@ class SparkSession(SparkConversionMixin):
         self,
         data: "pa.Table",
         schema: Union[StructType, str],
-        verifySchema: Union[_NoValueType, bool] = ...,
+        verifySchema: bool = ...,
     ) -> DataFrame:
         ...
 
@@ -1337,7 +1336,7 @@ class SparkSession(SparkConversionMixin):
         data: Union["RDD[Any]", Iterable[Any], "PandasDataFrameLike", 
"ArrayLike", "pa.Table"],
         schema: Optional[Union[AtomicType, StructType, str]] = None,
         samplingRatio: Optional[float] = None,
-        verifySchema: Union[_NoValueType, bool] = _NoValue,
+        verifySchema: bool = True,
     ) -> DataFrame:
         """
         Creates a :class:`DataFrame` from an :class:`RDD`, a list, a 
:class:`pandas.DataFrame`,
@@ -1381,14 +1380,11 @@ class SparkSession(SparkConversionMixin):
             if ``samplingRatio`` is ``None``. This option is effective only 
when the input is
             :class:`RDD`.
         verifySchema : bool, optional
-            verify data types of every row against schema.
-            If not provided, createDataFrame with
-            - pyarrow.Table, verifySchema=False
-            - pandas.DataFrame with Arrow optimization, verifySchema defaults 
to
-            `spark.sql.execution.pandas.convertToArrowArraySafely`
-            - pandas.DataFrame without Arrow optimization, verifySchema=True
-            - regular Python instances, verifySchema=True
-            Arrow optimization is enabled/disabled via 
`spark.sql.execution.arrow.pyspark.enabled`.
+            verify data types of every row against schema. Enabled by default.
+            When the input is :class:`pyarrow.Table` or when the input class is
+            :class:`pandas.DataFrame` and 
`spark.sql.execution.arrow.pyspark.enabled` is enabled,
+            this option is not effective. It follows Arrow type coercion. This 
option is not
+            supported with Spark Connect.
 
             .. versionadded:: 2.1.0
 
@@ -1588,13 +1584,8 @@ class SparkSession(SparkConversionMixin):
         data: Union["RDD[Any]", Iterable[Any]],
         schema: Optional[Union[DataType, List[str]]],
         samplingRatio: Optional[float],
-        verifySchema: Union[_NoValueType, bool],
+        verifySchema: bool,
     ) -> DataFrame:
-        if verifySchema is _NoValue:
-            # createDataFrame with regular Python instances
-            # or (without Arrow optimization) createDataFrame with Pandas 
DataFrame
-            verifySchema = True
-
         if isinstance(schema, StructType):
             verify_func = _make_type_verifier(schema) if verifySchema else 
lambda _: True
 
diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index d47a367a5460..885b3001b1db 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -137,10 +137,6 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase, PandasOnSparkTest
     def test_create_dataframe_namedtuples(self):
         self.check_create_dataframe_namedtuples(True)
 
-    @unittest.skip("Spark Connect does not support verifySchema.")
-    def test_createDataFrame_verifySchema(self):
-        super().test_createDataFrame_verifySchema()
-
 
 if __name__ == "__main__":
     from pyspark.sql.tests.connect.test_parity_arrow import *  # noqa: F401
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 19d0db989431..b71bdb1eece2 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -532,45 +532,6 @@ class ArrowTestsMixin:
         df_pandas = self.spark.createDataFrame(pdf)
         self.assertEqual(df_arrow.collect(), df_pandas.collect())
 
-    def test_createDataFrame_verifySchema(self):
-        data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 
300000000000]}
-        # data.value should fail schema validation when verifySchema is True
-        schema = StructType(
-            [StructField("id", IntegerType(), True), StructField("value", 
IntegerType(), True)]
-        )
-        expected = [
-            Row(id=1, value=1215752192),
-            Row(id=2, value=-1863462912),
-            Row(id=3, value=-647710720),
-        ]
-        # Arrow table
-        table = pa.table(data)
-        df = self.spark.createDataFrame(table, schema=schema)
-        self.assertEqual(df.collect(), expected)
-
-        with self.assertRaises(Exception):
-            self.spark.createDataFrame(table, schema=schema, verifySchema=True)
-
-        # pandas DataFrame with Arrow optimization
-        pdf = pd.DataFrame(data)
-        df = self.spark.createDataFrame(pdf, schema=schema)
-        # verifySchema defaults to 
`spark.sql.execution.pandas.convertToArrowArraySafely`,
-        # which is false by default
-        self.assertEqual(df.collect(), expected)
-        with self.assertRaises(Exception):
-            with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
-                df = self.spark.createDataFrame(pdf, schema=schema)
-        with self.assertRaises(Exception):
-            df = self.spark.createDataFrame(pdf, schema=schema, 
verifySchema=True)
-
-        # pandas DataFrame without Arrow optimization
-        with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
-            pdf = pd.DataFrame(data)
-            with self.assertRaises(Exception):
-                df = self.spark.createDataFrame(pdf, schema=schema)  # 
verifySchema defaults to True
-            df = self.spark.createDataFrame(pdf, schema=schema, 
verifySchema=False)
-            self.assertEqual(df.collect(), expected)
-
     def _createDataFrame_toggle(self, data, schema=None):
         with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
             df_no_arrow = self.spark.createDataFrame(data, schema=schema)
diff --git a/python/pyspark/sql/tests/typing/test_session.yml 
b/python/pyspark/sql/tests/typing/test_session.yml
index 98587458efe8..d6eee82a7678 100644
--- a/python/pyspark/sql/tests/typing/test_session.yml
+++ b/python/pyspark/sql/tests/typing/test_session.yml
@@ -17,7 +17,6 @@
 
 - case: createDataFrameStructsValid
   main: |
-    from pyspark._globals import _NoValueType
     from pyspark.sql import SparkSession
     from pyspark.sql.types import StructType, StructField, StringType, 
IntegerType
 
@@ -79,14 +78,14 @@
     main:18: note: Possible overload variants:
     main:18: note:     def [RowLike in (list[Any], tuple[Any, ...], Row)] 
createDataFrame(self, data: Iterable[RowLike], schema: Union[list[str], 
tuple[str, ...]] = ..., samplingRatio: Optional[float] = ...) -> DataFrame
     main:18: note:     def [RowLike in (list[Any], tuple[Any, ...], Row)] 
createDataFrame(self, data: RDD[RowLike], schema: Union[list[str], tuple[str, 
...]] = ..., samplingRatio: Optional[float] = ...) -> DataFrame
-    main:18: note:     def [RowLike in (list[Any], tuple[Any, ...], Row)] 
createDataFrame(self, data: Iterable[RowLike], schema: Union[StructType, str], 
*, verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
-    main:18: note:     def [RowLike in (list[Any], tuple[Any, ...], Row)] 
createDataFrame(self, data: RDD[RowLike], schema: Union[StructType, str], *, 
verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
-    main:18: note:     def [AtomicValue in (datetime, date, Decimal, bool, 
str, int, float)] createDataFrame(self, data: RDD[AtomicValue], schema: 
Union[AtomicType, str], verifySchema: Union[_NoValueType, bool] = ...) -> 
DataFrame
-    main:18: note:     def [AtomicValue in (datetime, date, Decimal, bool, 
str, int, float)] createDataFrame(self, data: Iterable[AtomicValue], schema: 
Union[AtomicType, str], verifySchema: Union[_NoValueType, bool] = ...) -> 
DataFrame
+    main:18: note:     def [RowLike in (list[Any], tuple[Any, ...], Row)] 
createDataFrame(self, data: Iterable[RowLike], schema: Union[StructType, str], 
*, verifySchema: bool = ...) -> DataFrame
+    main:18: note:     def [RowLike in (list[Any], tuple[Any, ...], Row)] 
createDataFrame(self, data: RDD[RowLike], schema: Union[StructType, str], *, 
verifySchema: bool = ...) -> DataFrame
+    main:18: note:     def [AtomicValue in (datetime, date, Decimal, bool, 
str, int, float)] createDataFrame(self, data: RDD[AtomicValue], schema: 
Union[AtomicType, str], verifySchema: bool = ...) -> DataFrame
+    main:18: note:     def [AtomicValue in (datetime, date, Decimal, bool, 
str, int, float)] createDataFrame(self, data: Iterable[AtomicValue], schema: 
Union[AtomicType, str], verifySchema: bool = ...) -> DataFrame
     main:18: note:     def createDataFrame(self, data: DataFrame, 
samplingRatio: Optional[float] = ...) -> DataFrame
     main:18: note:     def createDataFrame(self, data: Any, samplingRatio: 
Optional[float] = ...) -> DataFrame
-    main:18: note:     def createDataFrame(self, data: DataFrame, schema: 
Union[StructType, str], verifySchema: Union[_NoValueType, bool] = ...) -> 
DataFrame
-    main:18: note:     def createDataFrame(self, data: Any, schema: 
Union[StructType, str], verifySchema: Union[_NoValueType, bool] = ...) -> 
DataFrame
+    main:18: note:     def createDataFrame(self, data: DataFrame, schema: 
Union[StructType, str], verifySchema: bool = ...) -> DataFrame
+    main:18: note:     def createDataFrame(self, data: Any, schema: 
Union[StructType, str], verifySchema: bool = ...) -> DataFrame
 
 - case: createDataFrameFromEmptyRdd
   main: |


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to