This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 8d7e3d4af618116b6a3c306456b2001f2ac13c10
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Nov 22 15:36:27 2024 +0900

    Revert "[SPARK-50298][PYTHON][CONNECT] Implement verifySchema parameter of 
createDataFrame in Spark Connect"
    
    This reverts commit e1477a34d7457a1f31164e4e70e00c9912a8de4b.
---
 python/pyspark/sql/connect/conversion.py           |  6 +--
 python/pyspark/sql/connect/session.py              | 30 ++++----------
 .../pyspark/sql/tests/connect/test_parity_arrow.py |  3 +-
 python/pyspark/sql/tests/test_arrow.py             | 46 +++++++++-------------
 4 files changed, 31 insertions(+), 54 deletions(-)

diff --git a/python/pyspark/sql/connect/conversion.py 
b/python/pyspark/sql/connect/conversion.py
index f689c439f5f6..d803f37c5b9f 100644
--- a/python/pyspark/sql/connect/conversion.py
+++ b/python/pyspark/sql/connect/conversion.py
@@ -322,7 +322,7 @@ class LocalDataToArrowConversion:
             return lambda value: value
 
     @staticmethod
-    def convert(data: Sequence[Any], schema: StructType, verifySchema: bool = 
False) -> "pa.Table":
+    def convert(data: Sequence[Any], schema: StructType) -> "pa.Table":
         assert isinstance(data, list) and len(data) > 0
 
         assert schema is not None and isinstance(schema, StructType)
@@ -372,8 +372,8 @@ class LocalDataToArrowConversion:
                 ]
             )
         )
-        table = pa.Table.from_arrays(pylist, schema=pa_schema)
-        return table.cast(pa_schema, safe=verifySchema)
+
+        return pa.Table.from_arrays(pylist, schema=pa_schema)
 
 
 class ArrowTableToRowsConversion:
diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index e7292bf8804f..83b0496a8427 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -50,7 +50,6 @@ from pandas.api.types import (  # type: ignore[attr-defined]
 )
 import urllib
 
-from pyspark._globals import _NoValue, _NoValueType
 from pyspark.sql.connect.dataframe import DataFrame
 from pyspark.sql.dataframe import DataFrame as ParentDataFrame
 from pyspark.sql.connect.logging import logger
@@ -450,7 +449,7 @@ class SparkSession:
         data: Union["pd.DataFrame", "np.ndarray", "pa.Table", Iterable[Any]],
         schema: Optional[Union[AtomicType, StructType, str, List[str], 
Tuple[str, ...]]] = None,
         samplingRatio: Optional[float] = None,
-        verifySchema: Union[_NoValueType, bool] = _NoValue,
+        verifySchema: Optional[bool] = None,
     ) -> "ParentDataFrame":
         assert data is not None
         if isinstance(data, DataFrame):
@@ -462,6 +461,9 @@ class SparkSession:
         if samplingRatio is not None:
             warnings.warn("'samplingRatio' is ignored. It is not supported 
with Spark Connect.")
 
+        if verifySchema is not None:
+            warnings.warn("'verifySchema' is ignored. It is not supported with 
Spark Connect.")
+
         _schema: Optional[Union[AtomicType, StructType]] = None
         _cols: Optional[List[str]] = None
         _num_cols: Optional[int] = None
@@ -574,10 +576,7 @@ class SparkSession:
                 "spark.sql.session.timeZone", 
"spark.sql.execution.pandas.convertToArrowArraySafely"
             )
 
-            if verifySchema is _NoValue:
-                verifySchema = safecheck == "true"
-
-            ser = ArrowStreamPandasSerializer(cast(str, timezone), 
verifySchema)
+            ser = ArrowStreamPandasSerializer(cast(str, timezone), safecheck 
== "true")
 
             _table = pa.Table.from_batches(
                 [
@@ -597,9 +596,6 @@ class SparkSession:
                 ).cast(arrow_schema)
 
         elif isinstance(data, pa.Table):
-            if verifySchema is _NoValue:
-                verifySchema = False
-
             prefer_timestamp_ntz = is_timestamp_ntz_preferred()
 
             (timezone,) = 
self._client.get_configs("spark.sql.session.timeZone")
@@ -617,10 +613,7 @@ class SparkSession:
 
             _table = (
                 _check_arrow_table_timestamps_localize(data, schema, True, 
timezone)
-                .cast(
-                    to_arrow_schema(schema, 
error_on_duplicated_field_names_in_struct=True),
-                    safe=verifySchema,
-                )
+                .cast(to_arrow_schema(schema, 
error_on_duplicated_field_names_in_struct=True))
                 .rename_columns(schema.names)
             )
 
@@ -659,12 +652,6 @@ class SparkSession:
             # The _table should already have the proper column names.
             _cols = None
 
-            if verifySchema is not _NoValue:
-                warnings.warn(
-                    "'verifySchema' is ignored. It is not supported"
-                    " with np.ndarray input on Spark Connect."
-                )
-
         else:
             _data = list(data)
 
@@ -696,15 +683,12 @@ class SparkSession:
                         errorClass="CANNOT_DETERMINE_TYPE", 
messageParameters={}
                     )
 
-            if verifySchema is _NoValue:
-                verifySchema = True
-
             from pyspark.sql.connect.conversion import 
LocalDataToArrowConversion
 
             # Spark Connect will try its best to build the Arrow table with the
             # inferred schema in the client side, and then rename the columns 
and
             # cast the datatypes in the server side.
-            _table = LocalDataToArrowConversion.convert(_data, _schema, 
cast(bool, verifySchema))
+            _table = LocalDataToArrowConversion.convert(_data, _schema)
 
         # TODO: Beside the validation on number of columns, we should also 
check
         # whether the Arrow Schema is compatible with the user provided Schema.
diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index 99d03ad1a440..d47a367a5460 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -137,8 +137,9 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase, PandasOnSparkTest
     def test_create_dataframe_namedtuples(self):
         self.check_create_dataframe_namedtuples(True)
 
+    @unittest.skip("Spark Connect does not support verifySchema.")
     def test_createDataFrame_verifySchema(self):
-        self.check_createDataFrame_verifySchema(True)
+        super().test_createDataFrame_verifySchema()
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 99149d1a23d3..19d0db989431 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -533,11 +533,6 @@ class ArrowTestsMixin:
         self.assertEqual(df_arrow.collect(), df_pandas.collect())
 
     def test_createDataFrame_verifySchema(self):
-        for arrow_enabled in [True, False]:
-            with self.subTest(arrow_enabled=arrow_enabled):
-                self.check_createDataFrame_verifySchema(arrow_enabled)
-
-    def check_createDataFrame_verifySchema(self, arrow_enabled):
         data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 
300000000000]}
         # data.value should fail schema validation when verifySchema is True
         schema = StructType(
@@ -552,32 +547,29 @@ class ArrowTestsMixin:
         table = pa.table(data)
         df = self.spark.createDataFrame(table, schema=schema)
         self.assertEqual(df.collect(), expected)
+
         with self.assertRaises(Exception):
             self.spark.createDataFrame(table, schema=schema, verifySchema=True)
 
-        if arrow_enabled:
-            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
True}):
-                # pandas DataFrame with Arrow optimization
-                pdf = pd.DataFrame(data)
+        # pandas DataFrame with Arrow optimization
+        pdf = pd.DataFrame(data)
+        df = self.spark.createDataFrame(pdf, schema=schema)
+        # verifySchema defaults to 
`spark.sql.execution.pandas.convertToArrowArraySafely`,
+        # which is false by default
+        self.assertEqual(df.collect(), expected)
+        with self.assertRaises(Exception):
+            with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
                 df = self.spark.createDataFrame(pdf, schema=schema)
-                # verifySchema defaults to 
`spark.sql.execution.pandas.convertToArrowArraySafely`,
-                # which is false by default
-                self.assertEqual(df.collect(), expected)
-                with self.assertRaises(Exception):
-                    with self.sql_conf(
-                        
{"spark.sql.execution.pandas.convertToArrowArraySafely": True}
-                    ):
-                        df = self.spark.createDataFrame(pdf, schema=schema)
-                with self.assertRaises(Exception):
-                    df = self.spark.createDataFrame(pdf, schema=schema, 
verifySchema=True)
-        else:
-            # pandas DataFrame without Arrow optimization
-            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
-                pdf = pd.DataFrame(data)
-                with self.assertRaises(Exception):
-                    self.spark.createDataFrame(pdf, schema=schema)  # 
verifySchema defaults to True
-                df = self.spark.createDataFrame(pdf, schema=schema, 
verifySchema=False)
-                self.assertEqual(df.collect(), expected)
+        with self.assertRaises(Exception):
+            df = self.spark.createDataFrame(pdf, schema=schema, 
verifySchema=True)
+
+        # pandas DataFrame without Arrow optimization
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
+            pdf = pd.DataFrame(data)
+            with self.assertRaises(Exception):
+                df = self.spark.createDataFrame(pdf, schema=schema)  # 
verifySchema defaults to True
+            df = self.spark.createDataFrame(pdf, schema=schema, 
verifySchema=False)
+            self.assertEqual(df.collect(), expected)
 
     def _createDataFrame_toggle(self, data, schema=None):
         with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to