This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9bc9fac8c5e9 [SPARK-55334][PYTHON] Enable `TimestampType` and
`TimestampNTZType` in `convert_numpy`
9bc9fac8c5e9 is described below
commit 9bc9fac8c5e91bb16913ed2da91f2afac3cc925d
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Feb 6 17:40:34 2026 +0800
[SPARK-55334][PYTHON] Enable `TimestampType` and `TimestampNTZType` in
`convert_numpy`
### What changes were proposed in this pull request?
Enable TimestampType in `convert_numpy`
### Why are the changes needed?
to replace old conversion
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #54157 from zhengruifeng/fix_coerce_temporal_nanoseconds.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/conversion.py | 94 +++++++++++++++++++++++++++++-----------
1 file changed, 69 insertions(+), 25 deletions(-)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index 4da02fd78858..11b3b195a9d1 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -1129,6 +1129,39 @@ class ArrowArrayConversion:
convert=convert_func,
)
+ @classmethod
+ def preprocess_time(
+ cls,
+ arr: Union["pa.Array", "pa.ChunkedArray"],
+ ) -> Union["pa.Array", "pa.ChunkedArray"]:
+ """
+ 1, always drop the timezone from TimestampType;
+ 2, coerce_temporal_nanoseconds: coerce timestamp time units to
nanoseconds
+ """
+ import pyarrow as pa
+ import pyarrow.types as types
+ import pyarrow.compute as pc
+
+ def check_type_func(pa_type: pa.DataType) -> bool:
+ return types.is_timestamp(pa_type) and (pa_type.unit != "ns" or
pa_type.tz is not None)
+
+ def convert_func(arr: pa.Array) -> pa.Array:
+ assert isinstance(arr, pa.TimestampArray)
+
+ pa_type = arr.type
+
+ if pa_type.tz is not None:
+ arr = pc.local_timestamp(arr)
+ if pa_type.unit != "ns":
+ arr = pc.cast(arr, target_type=pa.timestamp("ns", tz=None))
+ return arr
+
+ return cls.convert(
+ arr,
+ check_type=check_type_func,
+ convert=convert_func,
+ )
+
class ArrowArrayToPandasConversion:
"""
@@ -1312,6 +1345,8 @@ class ArrowArrayToPandasConversion:
ShortType,
IntegerType,
LongType,
+ TimestampType,
+ TimestampNTZType,
)
if df_for_struct and isinstance(spark_type, StructType):
return all(isinstance(f.dataType, supported_types) for f in
spark_type.fields)
@@ -1340,22 +1375,32 @@ class ArrowArrayToPandasConversion:
assert types.is_struct(arr.type)
assert len(spark_type.names) == len(arr.type.names),
f"{spark_type} {arr.type} "
- series = [
- cls.convert_numpy(
- field_arr,
- spark_type=field.dataType,
- timezone=timezone,
- struct_in_pandas=struct_in_pandas,
- ndarray_as_list=ndarray_as_list,
- df_for_struct=False, # always False for child fields
- )
- for field_arr, field in zip(arr.flatten(), spark_type)
- ]
- pdf = pd.concat(series, axis=1)
+ pdf: pd.DataFrame = pd.concat(
+ [
+ cls.convert_numpy(
+ field_arr,
+ spark_type=field.dataType,
+ timezone=timezone,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ df_for_struct=False, # always False for child fields
+ )
+ for field_arr, field in zip(arr.flatten(), spark_type)
+ ],
+ axis=1,
+ )
pdf.columns = spark_type.names # type: ignore[assignment]
return pdf
- arr = ArrowArrayConversion.localize_tz(arr)
+ # Arrow array from batch.column(idx) contains name,
+ # and this name will be used to rename the pandas series
+ # returned by array.to_pandas().
+ # Right now, the name is dropped in arrow conversions.
+ # TODO: should make convert_numpy explicitly pass the expected series
name.
+ name = arr._name
+ arr = ArrowArrayConversion.preprocess_time(arr)
+
+ series: pd.Series
# TODO(SPARK-55332): Create benchmark for pa.array -> pd.series
integer conversion
# 1, benchmark a nullable integral array
@@ -1386,24 +1431,24 @@ class ArrowArrayToPandasConversion:
# 19.1 μs ± 242 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
if isinstance(spark_type, ByteType):
if arr.null_count > 0:
- return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
+ series =
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
else:
- return arr.to_pandas()
+ series = arr.to_pandas()
elif isinstance(spark_type, ShortType):
if arr.null_count > 0:
- return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
+ series =
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
else:
- return arr.to_pandas()
+ series = arr.to_pandas()
elif isinstance(spark_type, IntegerType):
if arr.null_count > 0:
- return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
+ series =
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
else:
- return arr.to_pandas()
+ series = arr.to_pandas()
elif isinstance(spark_type, LongType):
if arr.null_count > 0:
- return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+ series =
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
else:
- return arr.to_pandas()
+ series = arr.to_pandas()
elif isinstance(
spark_type,
(
@@ -1423,16 +1468,13 @@ class ArrowArrayToPandasConversion:
),
):
# TODO(SPARK-55333): Revisit date_as_object in arrow->pandas
conversion
- # TODO(SPARK-55334): Implement coerce_temporal_nanoseconds
# If the given column is a date type column, creates a series of
datetime.date directly
# instead of creating datetime64[ns] as intermediate data to avoid
overflow caused by
# datetime64[ns] type handling.
- # Cast dates to objects instead of datetime64[ns] dtype to avoid
overflow.
pandas_options = {
"date_as_object": True,
- "coerce_temporal_nanoseconds": True,
}
- return arr.to_pandas(**pandas_options)
+ series = arr.to_pandas(**pandas_options)
# elif isinstance(
# spark_type,
# (
@@ -1448,3 +1490,5 @@ class ArrowArrayToPandasConversion:
# TODO(SPARK-55324): Support complex types
else: # pragma: no cover
assert False, f"Need converter for {spark_type} but failed to find
one."
+
+ return series.rename(name)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]