This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 455c892 [SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps 455c892 is described below commit 455c8922e2a9f6d2bb70384c5b697bb4dc81b223 Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Mon Jul 12 11:55:05 2021 +0900 [SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps ### What changes were proposed in this pull request? Properly set `InternalField` more in `DataTypeOps`. ### Why are the changes needed? There are more places in `DataTypeOps` where we can manage `InternalField`. We should manage `InternalField` for these cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #33275 from ueshin/issues/SPARK-36064/fields. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 95e6c6e3e950b95b2b303d2fd5d859f752e2ca5e) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/pandas/data_type_ops/base.py | 39 +++---- python/pyspark/pandas/data_type_ops/boolean_ops.py | 64 +++++------ .../pandas/data_type_ops/categorical_ops.py | 6 +- python/pyspark/pandas/data_type_ops/date_ops.py | 9 +- .../pyspark/pandas/data_type_ops/datetime_ops.py | 22 ++-- python/pyspark/pandas/data_type_ops/num_ops.py | 43 +++----- python/pyspark/pandas/data_type_ops/string_ops.py | 38 +++++-- .../pandas/tests/data_type_ops/test_boolean_ops.py | 118 +++++++++++++++------ .../pandas/tests/data_type_ops/test_date_ops.py | 2 +- .../pandas/tests/data_type_ops/testing_utils.py | 9 +- 10 files changed, 212 insertions(+), 138 deletions(-) diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index 71aa943..c79408b 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -49,6 +49,7 @@ from pyspark.pandas.typedef.typehints import ( extension_dtypes_available, extension_float_dtypes_available, extension_object_dtypes_available, + spark_type_to_pandas_dtype, ) if extension_dtypes_available: @@ -79,7 +80,7 @@ def is_valid_operand_for_numeric_arithmetic(operand: Any, *, allow_bool: bool = def transform_boolean_operand_to_numeric( - operand: Any, spark_type: Optional[DataType] = None + operand: Any, *, spark_type: Optional[DataType] = None ) -> Any: """Transform boolean operand to numeric. @@ -92,7 +93,14 @@ def transform_boolean_operand_to_numeric( if isinstance(operand, IndexOpsMixin) and isinstance(operand.spark.data_type, BooleanType): assert spark_type, "spark_type must be provided if the operand is a boolean IndexOpsMixin" - return operand.spark.transform(lambda scol: scol.cast(spark_type)) + assert isinstance(spark_type, NumericType), "spark_type must be NumericType" + dtype = spark_type_to_pandas_dtype( + spark_type, use_extension_dtypes=operand._internal.data_fields[0].is_extension_dtype + ) + return operand._with_new_scol( + operand.spark.column.cast(spark_type), + field=operand._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type), + ) elif isinstance(operand, bool): return int(operand) else: @@ -122,7 +130,7 @@ def _as_categorical_type( scol = F.coalesce(map_scol.getItem(index_ops.spark.column), SF.lit(-1)) return index_ops._with_new_scol( - scol.cast(spark_type).alias(index_ops._internal.data_fields[0].name), + scol.cast(spark_type), field=index_ops._internal.data_fields[0].copy( dtype=dtype, spark_type=spark_type, nullable=False ), @@ -131,17 +139,15 @@ def _as_categorical_type( def _as_bool_type(index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike: """Cast `index_ops` to BooleanType Spark type, given `dtype`.""" - from pyspark.pandas.internal import InternalField - + spark_type = BooleanType() if isinstance(dtype, extension_dtypes): - scol = index_ops.spark.column.cast(BooleanType()) + scol = index_ops.spark.column.cast(spark_type) else: scol = F.when(index_ops.spark.column.isNull(), SF.lit(False)).otherwise( - index_ops.spark.column.cast(BooleanType()) + index_ops.spark.column.cast(spark_type) ) return index_ops._with_new_scol( - scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), + scol, field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type) ) @@ -151,16 +157,14 @@ def _as_string_type( """Cast `index_ops` to StringType Spark type, given `dtype` and `null_str`, representing null Spark column. """ - from pyspark.pandas.internal import InternalField - + spark_type = StringType() if isinstance(dtype, extension_dtypes): - scol = index_ops.spark.column.cast(StringType()) + scol = index_ops.spark.column.cast(spark_type) else: - casted = index_ops.spark.column.cast(StringType()) + casted = index_ops.spark.column.cast(spark_type) scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted) return index_ops._with_new_scol( - scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), + scol, field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type) ) @@ -181,10 +185,7 @@ def _as_other_type( assert not need_pre_process, "Pre-processing is needed before the type casting." scol = index_ops.spark.column.cast(spark_type) - return index_ops._with_new_scol( - scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), - ) + return index_ops._with_new_scol(scol, field=InternalField(dtype=dtype)) class DataTypeOps(object, metaclass=ABCMeta): diff --git a/python/pyspark/pandas/data_type_ops/boolean_ops.py b/python/pyspark/pandas/data_type_ops/boolean_ops.py index a9f9239..9ec295e 100644 --- a/python/pyspark/pandas/data_type_ops/boolean_ops.py +++ b/python/pyspark/pandas/data_type_ops/boolean_ops.py @@ -31,10 +31,8 @@ from pyspark.pandas.data_type_ops.base import ( _as_categorical_type, _as_other_type, ) -from pyspark.pandas.internal import InternalField from pyspark.pandas.spark import functions as SF -from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type -from pyspark.pandas.typedef.typehints import as_spark_type +from pyspark.pandas.typedef.typehints import as_spark_type, extension_dtypes, pandas_on_spark_type from pyspark.sql import functions as F from pyspark.sql.column import Column from pyspark.sql.types import BooleanType, StringType @@ -58,14 +56,14 @@ class BooleanOps(DataTypeOps): if isinstance(right, bool): return left.__or__(right) elif isinstance(right, numbers.Number): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left + right else: assert isinstance(right, IndexOpsMixin) if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, BooleanType): return left.__or__(right) else: - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left + right def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -73,12 +71,12 @@ class BooleanOps(DataTypeOps): raise TypeError( "Subtraction can not be applied to %s and the given type." % self.pretty_name ) - if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + if isinstance(right, numbers.Number): + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left - right else: assert isinstance(right, IndexOpsMixin) - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left - right def mul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -89,14 +87,14 @@ class BooleanOps(DataTypeOps): if isinstance(right, bool): return left.__and__(right) elif isinstance(right, numbers.Number): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left * right else: assert isinstance(right, IndexOpsMixin) if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, BooleanType): return left.__and__(right) else: - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left * right def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -104,12 +102,12 @@ class BooleanOps(DataTypeOps): raise TypeError( "True division can not be applied to %s and the given type." % self.pretty_name ) - if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + if isinstance(right, numbers.Number): + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left / right else: assert isinstance(right, IndexOpsMixin) - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left / right def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -117,12 +115,12 @@ class BooleanOps(DataTypeOps): raise TypeError( "Floor division can not be applied to %s and the given type." % self.pretty_name ) - if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + if isinstance(right, numbers.Number): + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left // right else: assert isinstance(right, IndexOpsMixin) - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left // right def mod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -130,12 +128,12 @@ class BooleanOps(DataTypeOps): raise TypeError( "Modulo can not be applied to %s and the given type." % self.pretty_name ) - if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + if isinstance(right, numbers.Number): + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left % right else: assert isinstance(right, IndexOpsMixin) - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left % right def pow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -143,19 +141,19 @@ class BooleanOps(DataTypeOps): raise TypeError( "Exponentiation can not be applied to %s and the given type." % self.pretty_name ) - if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + if isinstance(right, numbers.Number): + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return left ** right else: assert isinstance(right, IndexOpsMixin) - left = transform_boolean_operand_to_numeric(left, right.spark.data_type) + left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type) return left ** right def radd(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, bool): return left.__or__(right) elif isinstance(right, numbers.Number): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right + left else: raise TypeError( @@ -164,7 +162,7 @@ class BooleanOps(DataTypeOps): def rsub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right - left else: raise TypeError( @@ -175,7 +173,7 @@ class BooleanOps(DataTypeOps): if isinstance(right, bool): return left.__and__(right) elif isinstance(right, numbers.Number): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right * left else: raise TypeError( @@ -184,7 +182,7 @@ class BooleanOps(DataTypeOps): def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right / left else: raise TypeError( @@ -193,7 +191,7 @@ class BooleanOps(DataTypeOps): def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right // left else: raise TypeError( @@ -202,7 +200,7 @@ class BooleanOps(DataTypeOps): def rpow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right ** left else: raise TypeError( @@ -211,7 +209,7 @@ class BooleanOps(DataTypeOps): def rmod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, numbers.Number) and not isinstance(right, bool): - left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right)))) + left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right))) return right % left else: raise TypeError( @@ -261,13 +259,17 @@ class BooleanOps(DataTypeOps): index_ops.spark.column.isNotNull(), F.when(index_ops.spark.column, "True").otherwise("False"), ) + nullable = index_ops.spark.nullable else: null_str = str(None) casted = F.when(index_ops.spark.column, "True").otherwise("False") scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted) + nullable = False return index_ops._with_new_scol( - scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), + scol, + field=index_ops._internal.data_fields[0].copy( + dtype=dtype, spark_type=spark_type, nullable=nullable + ), ) else: return _as_other_type(index_ops, dtype, spark_type) diff --git a/python/pyspark/pandas/data_type_ops/categorical_ops.py b/python/pyspark/pandas/data_type_ops/categorical_ops.py index 8c2a27b..9238e6b 100644 --- a/python/pyspark/pandas/data_type_ops/categorical_ops.py +++ b/python/pyspark/pandas/data_type_ops/categorical_ops.py @@ -48,7 +48,7 @@ class CategoricalOps(DataTypeOps): return col.cat.codes def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike: - dtype, spark_type = pandas_on_spark_type(dtype) + dtype, _ = pandas_on_spark_type(dtype) if isinstance(dtype, CategoricalDtype) and dtype.categories is None: return index_ops.copy() @@ -62,9 +62,7 @@ class CategoricalOps(DataTypeOps): ) map_scol = F.create_map(*kvs) scol = map_scol.getItem(index_ops.spark.column) - return index_ops._with_new_scol( - scol.alias(index_ops._internal.data_spark_column_names[0]) - ).astype(dtype) + return index_ops._with_new_scol(scol).astype(dtype) # TODO(SPARK-35997): Implement comparison operators below def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: diff --git a/python/pyspark/pandas/data_type_ops/date_ops.py b/python/pyspark/pandas/data_type_ops/date_ops.py index 86fe8c3..59c8166 100644 --- a/python/pyspark/pandas/data_type_ops/date_ops.py +++ b/python/pyspark/pandas/data_type_ops/date_ops.py @@ -19,6 +19,7 @@ import datetime import warnings from typing import Any, Union +import numpy as np import pandas as pd from pandas.api.types import CategoricalDtype @@ -29,7 +30,6 @@ from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex from pyspark.pandas.base import column_op, IndexOpsMixin from pyspark.pandas.data_type_ops.base import ( DataTypeOps, - _as_bool_type, _as_categorical_type, _as_other_type, _as_string_type, @@ -104,7 +104,12 @@ class DateOps(DataTypeOps): if isinstance(dtype, CategoricalDtype): return _as_categorical_type(index_ops, dtype, spark_type) elif isinstance(spark_type, BooleanType): - return _as_bool_type(index_ops, dtype) + return index_ops._with_new_scol( + index_ops.spark.column.isNotNull(), + field=index_ops._internal.data_fields[0].copy( + dtype=np.dtype(bool), spark_type=spark_type, nullable=False + ), + ) elif isinstance(spark_type, StringType): return _as_string_type(index_ops, dtype, null_str=str(pd.NaT)) else: diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py b/python/pyspark/pandas/data_type_ops/datetime_ops.py index a30dc96..f815742 100644 --- a/python/pyspark/pandas/data_type_ops/datetime_ops.py +++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py @@ -19,11 +19,12 @@ import datetime import warnings from typing import Any, Union, cast +import numpy as np import pandas as pd from pandas.api.types import CategoricalDtype from pyspark.sql import functions as F, Column -from pyspark.sql.types import BooleanType, StringType, TimestampType +from pyspark.sql.types import BooleanType, LongType, StringType, TimestampType from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex from pyspark.pandas.base import IndexOpsMixin @@ -33,9 +34,8 @@ from pyspark.pandas.data_type_ops.base import ( _as_categorical_type, _as_other_type, ) -from pyspark.pandas.internal import InternalField from pyspark.pandas.spark import functions as SF -from pyspark.pandas.typedef import as_spark_type, extension_dtypes, pandas_on_spark_type +from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type class DatetimeOps(DataTypeOps): @@ -62,8 +62,11 @@ class DatetimeOps(DataTypeOps): warnings.warn(msg, UserWarning) return cast( SeriesOrIndex, - left.spark.transform( - lambda scol: scol.astype("long") - SF.lit(right).cast(as_spark_type("long")) + left._with_new_scol( + left.spark.column.cast(LongType()) - SF.lit(right).cast(LongType()), + field=left._internal.data_fields[0].copy( + dtype=np.dtype("int64"), spark_type=LongType() + ), ), ) else: @@ -81,8 +84,11 @@ class DatetimeOps(DataTypeOps): warnings.warn(msg, UserWarning) return cast( SeriesOrIndex, - left.spark.transform( - lambda scol: SF.lit(right).cast(as_spark_type("long")) - scol.astype("long") + left._with_new_scol( + SF.lit(right).cast(LongType()) - left.spark.column.cast(LongType()), + field=left._internal.data_fields[0].copy( + dtype=np.dtype("int64"), spark_type=LongType() + ), ), ) else: @@ -131,7 +137,7 @@ class DatetimeOps(DataTypeOps): scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted) return index_ops._with_new_scol( scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), + field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type), ) else: return _as_other_type(index_ops, dtype, spark_type) diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py index 8b26843..ed089e5 100644 --- a/python/pyspark/pandas/data_type_ops/num_ops.py +++ b/python/pyspark/pandas/data_type_ops/num_ops.py @@ -33,9 +33,8 @@ from pyspark.pandas.data_type_ops.base import ( _as_other_type, _as_string_type, ) -from pyspark.pandas.internal import InternalField from pyspark.pandas.spark import functions as SF -from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type +from pyspark.pandas.typedef.typehints import extension_dtypes, pandas_on_spark_type from pyspark.sql import functions as F from pyspark.sql.column import Column from pyspark.sql.types import ( @@ -55,38 +54,34 @@ class NumericOps(DataTypeOps): if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Addition can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return column_op(Column.__add__)(left, right) def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Subtraction can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return column_op(Column.__sub__)(left, right) def mod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Modulo can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - def mod(left: Column, right: Any) -> Column: return ((left % right) + right) % right + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return column_op(mod)(left, right) def pow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Exponentiation can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - def pow_func(left: Column, right: Any) -> Column: return F.when(left == 1, left).otherwise(Column.__pow__(left, right)) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return column_op(pow_func)(left, right) def radd(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -179,29 +174,25 @@ class IntegralOps(NumericOps): if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Multiplication can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return column_op(Column.__mul__)(left, right) def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("True division can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - def truediv(left: Column, right: Any) -> Column: return F.when( SF.lit(right != 0) | SF.lit(right).isNull(), left.__div__(right) ).otherwise(SF.lit(np.inf).__div__(left)) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(truediv)(left, right) def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Floor division can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - def floordiv(left: Column, right: Any) -> Column: return F.when(SF.lit(right is np.nan), np.nan).otherwise( F.when( @@ -209,6 +200,7 @@ class IntegralOps(NumericOps): ).otherwise(SF.lit(np.inf).__div__(left)) ) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(floordiv)(left, right) def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -220,7 +212,7 @@ class IntegralOps(NumericOps): SF.lit(right).__truediv__(left) ) - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(rtruediv)(left, right) def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -232,7 +224,7 @@ class IntegralOps(NumericOps): F.floor(SF.lit(right).__div__(left)) ) - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(rfloordiv)(left, right) def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike: @@ -262,16 +254,13 @@ class FractionalOps(NumericOps): if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Multiplication can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return column_op(Column.__mul__)(left, right) def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("True division can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - def truediv(left: Column, right: Any) -> Column: return F.when( SF.lit(right != 0) | SF.lit(right).isNull(), left.__div__(right) @@ -281,14 +270,13 @@ class FractionalOps(NumericOps): ) ) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(truediv)(left, right) def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if not is_valid_operand_for_numeric_arithmetic(right): raise TypeError("Floor division can not be applied to given types.") - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) - def floordiv(left: Column, right: Any) -> Column: return F.when(SF.lit(right is np.nan), np.nan).otherwise( F.when( @@ -300,6 +288,7 @@ class FractionalOps(NumericOps): ) ) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(floordiv)(left, right) def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -311,7 +300,7 @@ class FractionalOps(NumericOps): SF.lit(right).__truediv__(left) ) - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(rtruediv)(left, right) def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: @@ -325,7 +314,7 @@ class FractionalOps(NumericOps): ) ) - right = transform_boolean_operand_to_numeric(right, left.spark.data_type) + right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type) return numpy_column_op(rfloordiv)(left, right) def isnull(self, index_ops: IndexOpsLike) -> IndexOpsLike: @@ -351,7 +340,7 @@ class FractionalOps(NumericOps): ).otherwise(index_ops.spark.column.cast(spark_type)) return index_ops._with_new_scol( scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), + field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type), ) elif isinstance(spark_type, StringType): return _as_string_type(index_ops, dtype, null_str=str(np.nan)) diff --git a/python/pyspark/pandas/data_type_ops/string_ops.py b/python/pyspark/pandas/data_type_ops/string_ops.py index 9c6ca4c..b2c4259 100644 --- a/python/pyspark/pandas/data_type_ops/string_ops.py +++ b/python/pyspark/pandas/data_type_ops/string_ops.py @@ -31,7 +31,6 @@ from pyspark.pandas.data_type_ops.base import ( _as_other_type, _as_string_type, ) -from pyspark.pandas.internal import InternalField from pyspark.pandas.spark import functions as SF from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type from pyspark.sql import Column @@ -48,19 +47,31 @@ class StringOps(DataTypeOps): return "strings" def add(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: - if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, StringType): + if isinstance(right, str): + return cast( + SeriesOrIndex, + left._with_new_scol( + F.concat(left.spark.column, SF.lit(right)), field=left._internal.data_fields[0] + ), + ) + elif isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, StringType): return column_op(F.concat)(left, right) - elif isinstance(right, str): - return column_op(F.concat)(left, SF.lit(right)) else: raise TypeError("Addition can not be applied to given types.") def mul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: - if ( + if isinstance(right, int): + return cast( + SeriesOrIndex, + left._with_new_scol( + SF.repeat(left.spark.column, right), field=left._internal.data_fields[0] + ), + ) + elif ( isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, IntegralType) and not isinstance(right.dtype, CategoricalDtype) - ) or isinstance(right, int): + ): return column_op(SF.repeat)(left, right) else: raise TypeError("Multiplication can not be applied to given types.") @@ -69,14 +80,21 @@ class StringOps(DataTypeOps): if isinstance(right, str): return cast( SeriesOrIndex, - left._with_new_scol(F.concat(SF.lit(right), left.spark.column)), # TODO: dtype? + left._with_new_scol( + F.concat(SF.lit(right), left.spark.column), field=left._internal.data_fields[0] + ), ) else: raise TypeError("Addition can not be applied to given types.") def rmul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: if isinstance(right, int): - return column_op(SF.repeat)(left, right) + return cast( + SeriesOrIndex, + left._with_new_scol( + SF.repeat(left.spark.column, right), field=left._internal.data_fields[0] + ), + ) else: raise TypeError("Multiplication can not be applied to given types.") @@ -114,8 +132,8 @@ class StringOps(DataTypeOps): F.length(index_ops.spark.column) > 0 ) return index_ops._with_new_scol( - scol.alias(index_ops._internal.data_spark_column_names[0]), - field=InternalField(dtype=dtype), + scol, + field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type), ) elif isinstance(spark_type, StringType): return _as_string_type(index_ops, dtype) diff --git a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py index fef9fb1..fa37df0 100644 --- a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py +++ b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py @@ -26,7 +26,10 @@ from pandas.api.types import CategoricalDtype from pyspark import pandas as ps from pyspark.pandas.config import option_context from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils -from pyspark.pandas.typedef.typehints import extension_object_dtypes_available +from pyspark.pandas.typedef.typehints import ( + extension_float_dtypes_available, + extension_object_dtypes_available, +) from pyspark.sql.types import BooleanType from pyspark.testing.pandasutils import PandasOnSparkTestCase @@ -399,8 +402,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): def test_add(self): pser = self.pser psser = self.psser - self.assert_eq((pser + 1).astype(float), psser + 1) - self.assert_eq((pser + 0.1).astype(float), psser + 0.1) + self.check_extension(pser + 1, psser + 1) + if extension_float_dtypes_available: + self.check_extension(pser + 0.1, psser + 0.1) + else: + self.assert_eq(pser + 0.1, psser + 0.1) # In pandas, NA | True is NA, whereas NA | True is True in pandas-on-Spark self.check_extension(ps.Series([True, True, True], dtype="boolean"), psser + True) @@ -420,8 +426,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): def test_sub(self): pser = self.pser psser = self.psser - self.assert_eq((pser - 1).astype(float), psser - 1) - self.assert_eq((pser - 0.1).astype(float), psser - 0.1) + self.check_extension(pser - 1, psser - 1) + if extension_float_dtypes_available: + self.check_extension(pser - 0.1, psser - 0.1) + else: + self.assert_eq(pser - 0.1, psser - 0.1) self.assertRaises(TypeError, lambda: psser - psser) self.assertRaises(TypeError, lambda: psser - True) @@ -434,8 +443,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): def test_mul(self): pser = self.pser psser = self.psser - self.assert_eq((pser * 1).astype(float), psser * 1) - self.assert_eq((pser * 0.1).astype(float), psser * 0.1) + self.check_extension(pser * 1, psser * 1) + if extension_float_dtypes_available: + self.check_extension(pser * 0.1, psser * 0.1) + else: + self.assert_eq(pser * 0.1, psser * 0.1) # In pandas, NA & False is NA, whereas NA & False is False in pandas-on-Spark self.check_extension(pser * True, psser * True) @@ -455,8 +467,12 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): def test_truediv(self): pser = self.pser psser = self.psser - self.assert_eq((pser / 1).astype(float), psser / 1) - self.assert_eq((pser / 0.1).astype(float), psser / 0.1) + if extension_float_dtypes_available: + self.check_extension(pser / 1, psser / 1) + self.check_extension(pser / 0.1, psser / 0.1) + else: + self.assert_eq(pser / 1, psser / 1) + self.assert_eq(pser / 0.1, psser / 0.1) self.assertRaises(TypeError, lambda: psser / psser) self.assertRaises(TypeError, lambda: psser / True) @@ -474,7 +490,10 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): psser = self.psser # float is always returned in pandas-on-Spark - self.assert_eq((pser // 1).astype("float"), psser // 1) + if extension_float_dtypes_available: + self.check_extension((pser // 1).astype("Float64"), psser // 1) + else: + self.assert_eq((pser // 1).astype("float"), psser // 1) # in pandas, 1 // 0.1 = 9.0; in pandas-on-Spark, 1 // 0.1 = 10.0 # self.assert_eq(pser // 0.1, psser // 0.1) @@ -494,8 +513,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): def test_mod(self): pser = self.pser psser = self.psser - self.assert_eq((pser % 1).astype(float), psser % 1) - self.assert_eq((pser % 0.1).astype(float), psser % 0.1) + self.check_extension(pser % 1, psser % 1) + if extension_float_dtypes_available: + self.check_extension(pser % 0.1, psser % 0.1) + else: + self.assert_eq(pser % 0.1, psser % 0.1) self.assertRaises(TypeError, lambda: psser % psser) self.assertRaises(TypeError, lambda: psser % True) @@ -509,9 +531,18 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): pser = self.pser psser = self.psser # float is always returned in pandas-on-Spark - self.assert_eq((pser ** 1).astype("float"), psser ** 1) - self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1) - self.assert_eq((pser ** pser.astype(float)).astype("float"), psser ** psser.astype(float)) + if extension_float_dtypes_available: + self.check_extension((pser ** 1).astype("Float64"), psser ** 1) + self.check_extension((pser ** 0.1).astype("Float64"), self.psser ** 0.1) + self.check_extension( + (pser ** pser.astype(float)).astype("Float64"), psser ** psser.astype(float) + ) + else: + self.assert_eq((pser ** 1).astype("float"), psser ** 1) + self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1) + self.assert_eq( + (pser ** pser.astype(float)).astype("float"), psser ** psser.astype(float) + ) self.assertRaises(TypeError, lambda: psser ** psser) self.assertRaises(TypeError, lambda: psser ** True) @@ -526,8 +557,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): self.assertRaises(TypeError, lambda: self.psser ** psser) def test_radd(self): - self.assert_eq((1 + self.pser).astype(float), 1 + self.psser) - self.assert_eq((0.1 + self.pser).astype(float), 0.1 + self.psser) + self.check_extension(1 + self.pser, 1 + self.psser) + if extension_float_dtypes_available: + self.check_extension(0.1 + self.pser, 0.1 + self.psser) + else: + self.assert_eq(0.1 + self.pser, 0.1 + self.psser) self.assertRaises(TypeError, lambda: "x" + self.psser) # In pandas, NA | True is NA, whereas NA | True is True in pandas-on-Spark @@ -538,16 +572,22 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) + self.psser) def test_rsub(self): - self.assert_eq((1 - self.pser).astype(float), 1 - self.psser) - self.assert_eq((0.1 - self.pser).astype(float), 0.1 - self.psser) + self.check_extension(1 - self.pser, 1 - self.psser) + if extension_float_dtypes_available: + self.check_extension(0.1 - self.pser, 0.1 - self.psser) + else: + self.assert_eq(0.1 - self.pser, 0.1 - self.psser) self.assertRaises(TypeError, lambda: "x" - self.psser) self.assertRaises(TypeError, lambda: True - self.psser) self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - self.psser) self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) - self.psser) def test_rmul(self): - self.assert_eq((1 * self.pser).astype(float), 1 * self.psser) - self.assert_eq((0.1 * self.pser).astype(float), 0.1 * self.psser) + self.check_extension(1 * self.pser, 1 * self.psser) + if extension_float_dtypes_available: + self.check_extension(0.1 * self.pser, 0.1 * self.psser) + else: + self.assert_eq(0.1 * self.pser, 0.1 * self.psser) self.assertRaises(TypeError, lambda: "x" * self.psser) # In pandas, NA & False is NA, whereas NA & False is False in pandas-on-Spark @@ -558,35 +598,49 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils): self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) * self.psser) def test_rtruediv(self): - self.assert_eq((1 / self.pser).astype(float), 1 / self.psser) - self.assert_eq((0.1 / self.pser).astype(float), 0.1 / self.psser) + if extension_float_dtypes_available: + self.check_extension(1 / self.pser, 1 / self.psser) + self.check_extension(0.1 / self.pser, 0.1 / self.psser) + else: + self.assert_eq(1 / self.pser, 1 / self.psser) + self.assert_eq(0.1 / self.pser, 0.1 / self.psser) self.assertRaises(TypeError, lambda: "x" / self.psser) self.assertRaises(TypeError, lambda: True / self.psser) self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) / self.psser) self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) / self.psser) def test_rfloordiv(self): - self.assert_eq((1 // self.psser).astype(float), ps.Series([1.0, np.inf, np.nan])) - self.assert_eq((0.1 // self.psser).astype(float), ps.Series([0.0, np.inf, np.nan])) + self.assert_eq(pd.Series([1.0, np.inf, np.nan]), (1 // self.psser).astype(float)) + self.assert_eq(pd.Series([0.0, np.inf, np.nan]), (0.1 // self.psser).astype(float)) self.assertRaises(TypeError, lambda: "x" // self.psser) self.assertRaises(TypeError, lambda: True // self.psser) self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) // self.psser) self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) // self.psser) def test_rpow(self): - self.assert_eq(1 ** self.psser, ps.Series([1, 1, 1], dtype=float)) - self.assert_eq((0.1 ** self.pser).astype(float), 0.1 ** self.psser) + if extension_float_dtypes_available: + self.check_extension(pd.Series([1, 1, 1], dtype="Float64"), 1 ** self.psser) + self.check_extension((0.1 ** self.pser).astype("Float64"), 0.1 ** self.psser) + else: + self.assert_eq(pd.Series([1, 1, 1], dtype="float"), 1 ** self.psser) + self.assert_eq((0.1 ** self.pser).astype("float"), 0.1 ** self.psser) self.assertRaises(TypeError, lambda: "x" ** self.psser) self.assertRaises(TypeError, lambda: True ** self.psser) self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** self.psser) self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** self.psser) def test_rmod(self): - self.assert_eq(ps.Series([0, np.nan, np.nan], dtype=float), 1 % self.psser) - self.assert_eq( - ps.Series([0.10000000000000009, np.nan, np.nan], dtype=float), - 0.1 % self.psser, - ) + self.check_extension(ps.Series([0, np.nan, np.nan], dtype="Int64"), 1 % self.psser) + if extension_float_dtypes_available: + self.check_extension( + pd.Series([0.10000000000000009, np.nan, np.nan], dtype="Float64"), + 0.1 % self.psser, + ) + else: + self.assert_eq( + pd.Series([0.10000000000000009, np.nan, np.nan], dtype="float"), + 0.1 % self.psser, + ) self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % self.psser) self.assertRaises(TypeError, lambda: True % self.psser) diff --git a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py index edfe806..1574ebf 100644 --- a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py +++ b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py @@ -190,7 +190,7 @@ class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils): pser = self.pser psser = self.psser self.assert_eq(pser.astype(str), psser.astype(str)) - self.assert_eq(pd.Series([None, None, None]), psser.astype(bool)) + self.assert_eq(pser.astype(bool), psser.astype(bool)) cat_type = CategoricalDtype(categories=["a", "b", "c"]) self.assert_eq(pser.astype(cat_type), psser.astype(cat_type)) diff --git a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py index fc843c4..4bda305 100644 --- a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py +++ b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py @@ -164,7 +164,7 @@ class TestCasesUtils(object): + self.integral_extension_dtypes ) - def check_extension(self, psser, pser): + def check_extension(self, left, right): """ Compare `psser` and `pser` of numeric ExtensionDtypes. @@ -172,7 +172,8 @@ class TestCasesUtils(object): pandas versions. Please refer to https://github.com/pandas-dev/pandas/issues/39410. """ if LooseVersion("1.1") <= LooseVersion(pd.__version__) < LooseVersion("1.2.2"): - self.assert_eq(psser, pser, check_exact=False) - self.assertTrue(isinstance(psser.dtype, extension_dtypes)) + self.assert_eq(left, right, check_exact=False) + self.assertTrue(isinstance(left.dtype, extension_dtypes)) + self.assertTrue(isinstance(right.dtype, extension_dtypes)) else: - self.assert_eq(psser, pser) + self.assert_eq(left, right) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org