This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
commit 8d7e3d4af618116b6a3c306456b2001f2ac13c10 Author: Hyukjin Kwon <[email protected]> AuthorDate: Fri Nov 22 15:36:27 2024 +0900 Revert "[SPARK-50298][PYTHON][CONNECT] Implement verifySchema parameter of createDataFrame in Spark Connect" This reverts commit e1477a34d7457a1f31164e4e70e00c9912a8de4b. --- python/pyspark/sql/connect/conversion.py | 6 +-- python/pyspark/sql/connect/session.py | 30 ++++---------- .../pyspark/sql/tests/connect/test_parity_arrow.py | 3 +- python/pyspark/sql/tests/test_arrow.py | 46 +++++++++------------- 4 files changed, 31 insertions(+), 54 deletions(-) diff --git a/python/pyspark/sql/connect/conversion.py b/python/pyspark/sql/connect/conversion.py index f689c439f5f6..d803f37c5b9f 100644 --- a/python/pyspark/sql/connect/conversion.py +++ b/python/pyspark/sql/connect/conversion.py @@ -322,7 +322,7 @@ class LocalDataToArrowConversion: return lambda value: value @staticmethod - def convert(data: Sequence[Any], schema: StructType, verifySchema: bool = False) -> "pa.Table": + def convert(data: Sequence[Any], schema: StructType) -> "pa.Table": assert isinstance(data, list) and len(data) > 0 assert schema is not None and isinstance(schema, StructType) @@ -372,8 +372,8 @@ class LocalDataToArrowConversion: ] ) ) - table = pa.Table.from_arrays(pylist, schema=pa_schema) - return table.cast(pa_schema, safe=verifySchema) + + return pa.Table.from_arrays(pylist, schema=pa_schema) class ArrowTableToRowsConversion: diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index e7292bf8804f..83b0496a8427 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -50,7 +50,6 @@ from pandas.api.types import ( # type: ignore[attr-defined] ) import urllib -from pyspark._globals import _NoValue, _NoValueType from pyspark.sql.connect.dataframe import DataFrame from pyspark.sql.dataframe import DataFrame as ParentDataFrame from pyspark.sql.connect.logging import logger @@ -450,7 +449,7 @@ class SparkSession: data: Union["pd.DataFrame", "np.ndarray", "pa.Table", Iterable[Any]], schema: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str, ...]]] = None, samplingRatio: Optional[float] = None, - verifySchema: Union[_NoValueType, bool] = _NoValue, + verifySchema: Optional[bool] = None, ) -> "ParentDataFrame": assert data is not None if isinstance(data, DataFrame): @@ -462,6 +461,9 @@ class SparkSession: if samplingRatio is not None: warnings.warn("'samplingRatio' is ignored. It is not supported with Spark Connect.") + if verifySchema is not None: + warnings.warn("'verifySchema' is ignored. It is not supported with Spark Connect.") + _schema: Optional[Union[AtomicType, StructType]] = None _cols: Optional[List[str]] = None _num_cols: Optional[int] = None @@ -574,10 +576,7 @@ class SparkSession: "spark.sql.session.timeZone", "spark.sql.execution.pandas.convertToArrowArraySafely" ) - if verifySchema is _NoValue: - verifySchema = safecheck == "true" - - ser = ArrowStreamPandasSerializer(cast(str, timezone), verifySchema) + ser = ArrowStreamPandasSerializer(cast(str, timezone), safecheck == "true") _table = pa.Table.from_batches( [ @@ -597,9 +596,6 @@ class SparkSession: ).cast(arrow_schema) elif isinstance(data, pa.Table): - if verifySchema is _NoValue: - verifySchema = False - prefer_timestamp_ntz = is_timestamp_ntz_preferred() (timezone,) = self._client.get_configs("spark.sql.session.timeZone") @@ -617,10 +613,7 @@ class SparkSession: _table = ( _check_arrow_table_timestamps_localize(data, schema, True, timezone) - .cast( - to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), - safe=verifySchema, - ) + .cast(to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True)) .rename_columns(schema.names) ) @@ -659,12 +652,6 @@ class SparkSession: # The _table should already have the proper column names. _cols = None - if verifySchema is not _NoValue: - warnings.warn( - "'verifySchema' is ignored. It is not supported" - " with np.ndarray input on Spark Connect." - ) - else: _data = list(data) @@ -696,15 +683,12 @@ class SparkSession: errorClass="CANNOT_DETERMINE_TYPE", messageParameters={} ) - if verifySchema is _NoValue: - verifySchema = True - from pyspark.sql.connect.conversion import LocalDataToArrowConversion # Spark Connect will try its best to build the Arrow table with the # inferred schema in the client side, and then rename the columns and # cast the datatypes in the server side. - _table = LocalDataToArrowConversion.convert(_data, _schema, cast(bool, verifySchema)) + _table = LocalDataToArrowConversion.convert(_data, _schema) # TODO: Beside the validation on number of columns, we should also check # whether the Arrow Schema is compatible with the user provided Schema. diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py index 99d03ad1a440..d47a367a5460 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -137,8 +137,9 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase, PandasOnSparkTest def test_create_dataframe_namedtuples(self): self.check_create_dataframe_namedtuples(True) + @unittest.skip("Spark Connect does not support verifySchema.") def test_createDataFrame_verifySchema(self): - self.check_createDataFrame_verifySchema(True) + super().test_createDataFrame_verifySchema() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 99149d1a23d3..19d0db989431 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -533,11 +533,6 @@ class ArrowTestsMixin: self.assertEqual(df_arrow.collect(), df_pandas.collect()) def test_createDataFrame_verifySchema(self): - for arrow_enabled in [True, False]: - with self.subTest(arrow_enabled=arrow_enabled): - self.check_createDataFrame_verifySchema(arrow_enabled) - - def check_createDataFrame_verifySchema(self, arrow_enabled): data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 300000000000]} # data.value should fail schema validation when verifySchema is True schema = StructType( @@ -552,32 +547,29 @@ class ArrowTestsMixin: table = pa.table(data) df = self.spark.createDataFrame(table, schema=schema) self.assertEqual(df.collect(), expected) + with self.assertRaises(Exception): self.spark.createDataFrame(table, schema=schema, verifySchema=True) - if arrow_enabled: - with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": True}): - # pandas DataFrame with Arrow optimization - pdf = pd.DataFrame(data) + # pandas DataFrame with Arrow optimization + pdf = pd.DataFrame(data) + df = self.spark.createDataFrame(pdf, schema=schema) + # verifySchema defaults to `spark.sql.execution.pandas.convertToArrowArraySafely`, + # which is false by default + self.assertEqual(df.collect(), expected) + with self.assertRaises(Exception): + with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}): df = self.spark.createDataFrame(pdf, schema=schema) - # verifySchema defaults to `spark.sql.execution.pandas.convertToArrowArraySafely`, - # which is false by default - self.assertEqual(df.collect(), expected) - with self.assertRaises(Exception): - with self.sql_conf( - {"spark.sql.execution.pandas.convertToArrowArraySafely": True} - ): - df = self.spark.createDataFrame(pdf, schema=schema) - with self.assertRaises(Exception): - df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True) - else: - # pandas DataFrame without Arrow optimization - with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): - pdf = pd.DataFrame(data) - with self.assertRaises(Exception): - self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to True - df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=False) - self.assertEqual(df.collect(), expected) + with self.assertRaises(Exception): + df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True) + + # pandas DataFrame without Arrow optimization + with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): + pdf = pd.DataFrame(data) + with self.assertRaises(Exception): + df = self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to True + df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=False) + self.assertEqual(df.collect(), expected) def _createDataFrame_toggle(self, data, schema=None): with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
