This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 84addc5d1d8 [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py` 84addc5d1d8 is described below commit 84addc5d1d8359a5b716ec869489fc961af23cf2 Author: Xinrong Meng <xinrong.m...@databricks.com> AuthorDate: Thu Apr 28 09:17:24 2022 +0900 [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py` Minor refactoring of `python/pyspark/sql/pandas/conversion.py`, which includes: - doc change - renaming To improve code readability and maintainability. No. Existing tests. Closes #36384 from xinrong-databricks/conversion.py. Authored-by: Xinrong Meng <xinrong.m...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit c19fadabde3ef3f9c7e4fa9bf74632a4f8e1f3e2) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/conversion.py | 52 ++++++++++++++++----------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 7153450d2bc..808444f1e2e 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -51,7 +51,7 @@ if TYPE_CHECKING: class PandasConversionMixin: """ - Min-in for the conversion from Spark to pandas. Currently, only :class:`DataFrame` + Mix-in for the conversion from Spark to pandas. Currently, only :class:`DataFrame` can use this class. """ @@ -65,10 +65,10 @@ class PandasConversionMixin: Notes ----- - This method should only be used if the resulting Pandas's :class:`DataFrame` is + This method should only be used if the resulting Pandas ``pandas.DataFrame`` is expected to be small, as all the data is loaded into the driver's memory. - Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental. + Usage with ``spark.sql.execution.arrow.pyspark.enabled=True`` is experimental. Examples -------- @@ -136,8 +136,7 @@ class PandasConversionMixin: # Rename columns to avoid duplicated column names. tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))] - c = self.sparkSession._jconf - self_destruct = c.arrowPySparkSelfDestructEnabled() + self_destruct = jconf.arrowPySparkSelfDestructEnabled() batches = self.toDF(*tmp_column_names)._collect_as_arrow( split_batches=self_destruct ) @@ -176,11 +175,11 @@ class PandasConversionMixin: else: corrected_panda_types = {} for index, field in enumerate(self.schema): - panda_type = PandasConversionMixin._to_corrected_pandas_type( + pandas_type = PandasConversionMixin._to_corrected_pandas_type( field.dataType ) corrected_panda_types[tmp_column_names[index]] = ( - np.object0 if panda_type is None else panda_type + np.object0 if pandas_type is None else pandas_type ) pdf = pd.DataFrame(columns=tmp_column_names).astype( @@ -206,36 +205,37 @@ class PandasConversionMixin: pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) column_counter = Counter(self.columns) - dtype: List[Optional[Type]] = [None] * len(self.schema) - for fieldIdx, field in enumerate(self.schema): - # For duplicate column name, we use `iloc` to access it. + corrected_dtypes: List[Optional[Type]] = [None] * len(self.schema) + for index, field in enumerate(self.schema): + # We use `iloc` to access columns with duplicate column names. if column_counter[field.name] > 1: - pandas_col = pdf.iloc[:, fieldIdx] + pandas_col = pdf.iloc[:, index] else: pandas_col = pdf[field.name] pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType) # SPARK-21766: if an integer field is nullable and has null values, it can be - # inferred by pandas as float column. Once we convert the column with NaN back - # to integer type e.g., np.int16, we will hit exception. So we use the inferred - # float type, not the corrected type from the schema in this case. + # inferred by pandas as a float column. If we convert the column with NaN back + # to integer type e.g., np.int16, we will hit an exception. So we use the + # pandas-inferred float type, rather than the corrected type from the schema + # in this case. if pandas_type is not None and not ( isinstance(field.dataType, IntegralType) and field.nullable and pandas_col.isnull().any() ): - dtype[fieldIdx] = pandas_type - # Ensure we fall back to nullable numpy types, even when whole column is null: + corrected_dtypes[index] = pandas_type + # Ensure we fall back to nullable numpy types. if isinstance(field.dataType, IntegralType) and pandas_col.isnull().any(): - dtype[fieldIdx] = np.float64 + corrected_dtypes[index] = np.float64 if isinstance(field.dataType, BooleanType) and pandas_col.isnull().any(): - dtype[fieldIdx] = np.object # type: ignore[attr-defined] + corrected_dtypes[index] = np.object # type: ignore[attr-defined] df = pd.DataFrame() - for index, t in enumerate(dtype): + for index, t in enumerate(corrected_dtypes): column_name = self.schema[index].name - # For duplicate column name, we use `iloc` to access it. + # We use `iloc` to access columns with duplicate column names. if column_counter[column_name] > 1: series = pdf.iloc[:, index] else: @@ -255,25 +255,23 @@ class PandasConversionMixin: else: df[column_name] = series - pdf = df - if timezone is None: - return pdf + return df else: from pyspark.sql.pandas.types import _check_series_convert_timestamps_local_tz for field in self.schema: # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if isinstance(field.dataType, TimestampType): - pdf[field.name] = _check_series_convert_timestamps_local_tz( - pdf[field.name], timezone + df[field.name] = _check_series_convert_timestamps_local_tz( + df[field.name], timezone ) - return pdf + return df @staticmethod def _to_corrected_pandas_type(dt: DataType) -> Optional[Type]: """ - When converting Spark SQL records to Pandas :class:`DataFrame`, the inferred data type + When converting Spark SQL records to Pandas `pandas.DataFrame`, the inferred data type may be wrong. This method gets the corrected data type for Pandas if that type may be inferred incorrectly. """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org