This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a5ed572a6e60 [SPARK-55036][PYTHON] Add `ArrowTimestampConversion` for
arrow timezone handling
a5ed572a6e60 is described below
commit a5ed572a6e607b6426b8b60b2a3ee51f00ed995e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 20 12:14:22 2026 +0800
[SPARK-55036][PYTHON] Add `ArrowTimestampConversion` for arrow timezone
handling
### What changes were proposed in this pull request?
Add `ArrowTimeStampConversion` for arrow timezone handling
### Why are the changes needed?
revisit existing timezone handling in pandas udf, introduce a new pure
pyarrow one for complex datatypes, will try to use it to replace existing one
in the future
### Does this PR introduce _any_ user-facing change?
no, this PR just adds a new utils
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53800 from zhengruifeng/init_arrow_convert.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/conversion.py | 109 ++++++++++++++++++++++++++++
python/pyspark/sql/tests/test_conversion.py | 94 ++++++++++++++++++++++++
2 files changed, 203 insertions(+)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index a657169ca796..dfab59b53266 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -845,3 +845,112 @@ class ArrowTableToRowsConversion:
return [tuple()] * table.num_rows
else:
return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampConversion:
+ @classmethod
+ def _need_localization(cls, at: "pa.DataType") -> bool:
+ import pyarrow.types as types
+
+ if types.is_timestamp(at) and at.tz is not None:
+ return True
+ elif (
+ types.is_list(at)
+ or types.is_large_list(at)
+ or types.is_fixed_size_list(at)
+ or types.is_dictionary(at)
+ ):
+ return cls._need_localization(at.value_type)
+ elif types.is_map(at):
+ return any(cls._need_localization(dt) for dt in [at.key_type,
at.item_type])
+ elif types.is_struct(at):
+ return any(cls._need_localization(field.type) for field in at)
+ else:
+ return False
+
+ @staticmethod
+ def localize_tz(arr: "pa.Array") -> "pa.Array":
+ """
+ Convert Arrow timezone-aware timestamps to timezone-naive in the
specified timezone.
+ This function works on Arrow Arrays, and it recurses to convert nested
types.
+ This function is dedicated for Pandas UDF execution.
+
+ Differences from _create_converter_to_pandas +
_check_series_convert_timestamps_local_tz:
+ 1, respect the timezone field in pyarrow timestamp type;
+ 2, do not use local time at any time;
+ 3, handle nested types in a consistent way.
(_create_converter_to_pandas handles
+ simple timestamp series with session timezone, but handles nested
series with
+ datetime.timezone.utc)
+
+ Differences from _check_arrow_array_timestamps_localize:
+ 1, respect the timezone field in pyarrow timestamp type;
+ 2, do not handle timezone-naive timestamp;
+ 3, do not support unit coercion which won't happen in UDF execution.
+
+ Parameters
+ ----------
+ arr : :class:`pyarrow.Array`
+
+ Returns
+ -------
+ :class:`pyarrow.Array`
+
+ Notes
+ -----
+ Arrow UDF (@arrow_udf/mapInArrow/etc) always preserve the original
timezone, and thus
+ doesn't need this conversion.
+ """
+ import pyarrow as pa
+ import pyarrow.types as types
+ import pyarrow.compute as pc
+
+ pa_type = arr.type
+ if not ArrowTimestampConversion._need_localization(pa_type):
+ return arr
+
+ if types.is_timestamp(pa_type) and pa_type.tz is not None:
+ # import datetime
+ # from zoneinfo import ZoneInfo
+ # ts = datetime.datetime(2022, 1, 5, 15, 0, 1,
tzinfo=ZoneInfo('Asia/Singapore'))
+ # arr = pa.array([ts])
+ # arr[0]
+ # <pyarrow.TimestampScalar: '2022-01-05T15:00:01.000000+0800'>
+ # arr = pc.local_timestamp(arr)
+ # arr[0]
+ # <pyarrow.TimestampScalar: '2022-01-05T15:00:01.000000'>
+
+ return pc.local_timestamp(arr)
+ elif types.is_list(pa_type):
+ return pa.ListArray.from_arrays(
+ offsets=arr.offsets,
+ values=ArrowTimestampConversion.localize_tz(arr.values),
+ )
+ elif types.is_large_list(pa_type):
+ return pa.LargeListType.from_arrays(
+ offsets=arr.offsets,
+ values=ArrowTimestampConversion.localize_tz(arr.values),
+ )
+ elif types.is_fixed_size_list(pa_type):
+ return pa.FixedSizeListArray.from_arrays(
+ values=ArrowTimestampConversion.localize_tz(arr.values),
+ )
+ elif types.is_dictionary(pa_type):
+ return pa.DictionaryArray.from_arrays(
+ indices=arr.indices,
+
dictionary=ArrowTimestampConversion.localize_tz(arr.dictionary),
+ )
+ elif types.is_map(pa_type):
+ return pa.MapArray.from_arrays(
+ offsets=arr.offsets,
+ keys=ArrowTimestampConversion.localize_tz(arr.keys),
+ items=ArrowTimestampConversion.localize_tz(arr.items),
+ )
+ elif types.is_struct(pa_type):
+ return pa.StructArray.from_arrays(
+ arrays=[
+ ArrowTimestampConversion.localize_tz(arr.field(i)) for i
in range(len(arr.type))
+ ],
+ names=arr.type.names,
+ )
+ else: # pragma: no cover
+ assert False, f"Need converter for {pa_type} but failed to find
one."
diff --git a/python/pyspark/sql/tests/test_conversion.py
b/python/pyspark/sql/tests/test_conversion.py
index 1323652e8366..9773b2154c63 100644
--- a/python/pyspark/sql/tests/test_conversion.py
+++ b/python/pyspark/sql/tests/test_conversion.py
@@ -14,12 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import datetime
import unittest
+from zoneinfo import ZoneInfo
from pyspark.errors import PySparkValueError
from pyspark.sql.conversion import (
ArrowTableToRowsConversion,
LocalDataToArrowConversion,
+ ArrowTimestampConversion,
)
from pyspark.sql.types import (
ArrayType,
@@ -201,6 +204,97 @@ class ConversionTests(unittest.TestCase):
with self.assertRaises(PySparkValueError):
LocalDataToArrowConversion.convert([(value,)], schema,
use_large_var_types=False)
+ def test_arrow_array_localize_tz(self):
+ import pyarrow as pa
+
+ tz1 = ZoneInfo("Asia/Singapore")
+ tz2 = ZoneInfo("America/Los_Angeles")
+ tz3 = ZoneInfo("UTC")
+
+ ts0 = datetime.datetime(2026, 1, 5, 15, 0, 1)
+ ts1 = datetime.datetime(2026, 1, 5, 15, 0, 1, tzinfo=tz1)
+ ts2 = datetime.datetime(2026, 1, 5, 15, 0, 1, tzinfo=tz2)
+ ts3 = datetime.datetime(2026, 1, 5, 15, 0, 1, tzinfo=tz3)
+
+ # non-timestampe types
+ for arr in [
+ pa.array([1, 2]),
+ pa.array([["x", "y"]]),
+ pa.array([[[3.0, 4.0]]]),
+ pa.StructArray.from_arrays([pa.array([1, 2]), pa.array(["x",
"y"])], names=["a", "b"]),
+ pa.array([{1: None, 2: "x"}], type=pa.map_(pa.int32(),
pa.string())),
+ ]:
+ output = ArrowTimestampConversion.localize_tz(arr)
+ self.assertTrue(output is arr, f"MUST not generate a new array
{output.tolist()}")
+
+ # timestampe types
+ for arr, expected in [
+ (pa.array([ts0, None]), pa.array([ts0, None])), # ts-ntz
+ (pa.array([ts1, None]), pa.array([ts0, None])), # ts-ltz
+ (pa.array([[ts2, None]]), pa.array([[ts0, None]])), #
array<ts-ltz>
+ (pa.array([[[ts3, None]]]), pa.array([[[ts0, None]]])), #
array<array<ts-ltz>>
+ (
+ pa.StructArray.from_arrays(
+ [pa.array([1, 2]), pa.array([ts0, None]), pa.array([ts1,
None])],
+ names=["a", "b", "c"],
+ ),
+ pa.StructArray.from_arrays(
+ [pa.array([1, 2]), pa.array([ts0, None]), pa.array([ts0,
None])],
+ names=["a", "b", "c"],
+ ),
+ ), # struct<int, ts-ntz, ts-ltz>
+ (
+ pa.StructArray.from_arrays(
+ [pa.array([1, 2]), pa.array([[ts2], [None]])], names=["a",
"b"]
+ ),
+ pa.StructArray.from_arrays(
+ [pa.array([1, 2]), pa.array([[ts0], [None]])], names=["a",
"b"]
+ ),
+ ), # struct<int, array<ts-ltz>>
+ (
+ pa.StructArray.from_arrays(
+ [
+ pa.array([ts2, None]),
+ pa.StructArray.from_arrays(
+ [pa.array(["a", "b"]), pa.array([[ts3], [None]])],
names=["x", "y"]
+ ),
+ ],
+ names=["a", "b"],
+ ),
+ pa.StructArray.from_arrays(
+ [
+ pa.array([ts0, None]),
+ pa.StructArray.from_arrays(
+ [pa.array(["a", "b"]), pa.array([[ts0], [None]])],
names=["x", "y"]
+ ),
+ ],
+ names=["a", "b"],
+ ),
+ ), # struct<ts-ltz, struct<str, array<ts-ltz>>>
+ (
+ pa.array(
+ [{1: None, 2: ts1}],
+ type=pa.map_(pa.int32(), pa.timestamp("us", tz=tz1)),
+ ),
+ pa.array(
+ [{1: None, 2: ts0}],
+ type=pa.map_(pa.int32(), pa.timestamp("us")),
+ ),
+ ), # map<int, ts-ltz>
+ (
+ pa.array(
+ [{1: [None], 2: [ts2, None]}],
+ type=pa.map_(pa.int32(), pa.list_(pa.timestamp("us",
tz=tz2))),
+ ),
+ pa.array(
+ [{1: [None], 2: [ts0, None]}],
+ type=pa.map_(pa.int32(), pa.list_(pa.timestamp("us"))),
+ ),
+ ), # map<int, array<ts-ltz>>
+ ]:
+ output = ArrowTimestampConversion.localize_tz(arr)
+ self.assertEqual(output, expected, f"{output.tolist()} !=
{expected.tolist()}")
+
if __name__ == "__main__":
from pyspark.testing import main
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]