This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a5ed572a6e60 [SPARK-55036][PYTHON] Add `ArrowTimestampConversion` for 
arrow timezone handling
a5ed572a6e60 is described below

commit a5ed572a6e607b6426b8b60b2a3ee51f00ed995e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 20 12:14:22 2026 +0800

    [SPARK-55036][PYTHON] Add `ArrowTimestampConversion` for arrow timezone 
handling
    
    ### What changes were proposed in this pull request?
    Add `ArrowTimeStampConversion` for arrow timezone handling
    
    ### Why are the changes needed?
    revisit existing timezone handling in pandas udf, introduce a new pure 
pyarrow one for complex datatypes, will try to use it to replace existing one 
in the future
    
    ### Does this PR introduce _any_ user-facing change?
    no, this PR just adds a new utils
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53800 from zhengruifeng/init_arrow_convert.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/conversion.py            | 109 ++++++++++++++++++++++++++++
 python/pyspark/sql/tests/test_conversion.py |  94 ++++++++++++++++++++++++
 2 files changed, 203 insertions(+)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index a657169ca796..dfab59b53266 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -845,3 +845,112 @@ class ArrowTableToRowsConversion:
                 return [tuple()] * table.num_rows
             else:
                 return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampConversion:
+    @classmethod
+    def _need_localization(cls, at: "pa.DataType") -> bool:
+        import pyarrow.types as types
+
+        if types.is_timestamp(at) and at.tz is not None:
+            return True
+        elif (
+            types.is_list(at)
+            or types.is_large_list(at)
+            or types.is_fixed_size_list(at)
+            or types.is_dictionary(at)
+        ):
+            return cls._need_localization(at.value_type)
+        elif types.is_map(at):
+            return any(cls._need_localization(dt) for dt in [at.key_type, 
at.item_type])
+        elif types.is_struct(at):
+            return any(cls._need_localization(field.type) for field in at)
+        else:
+            return False
+
+    @staticmethod
+    def localize_tz(arr: "pa.Array") -> "pa.Array":
+        """
+        Convert Arrow timezone-aware timestamps to timezone-naive in the 
specified timezone.
+        This function works on Arrow Arrays, and it recurses to convert nested 
types.
+        This function is dedicated for Pandas UDF execution.
+
+        Differences from _create_converter_to_pandas + 
_check_series_convert_timestamps_local_tz:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not use local time at any time;
+        3, handle nested types in a consistent way. 
(_create_converter_to_pandas handles
+        simple timestamp series with session timezone, but handles nested 
series with
+        datetime.timezone.utc)
+
+        Differences from _check_arrow_array_timestamps_localize:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not handle timezone-naive timestamp;
+        3, do not support unit coercion which won't happen in UDF execution.
+
+        Parameters
+        ----------
+        arr : :class:`pyarrow.Array`
+
+        Returns
+        -------
+        :class:`pyarrow.Array`
+
+        Notes
+        -----
+        Arrow UDF (@arrow_udf/mapInArrow/etc) always preserve the original 
timezone, and thus
+        doesn't need this conversion.
+        """
+        import pyarrow as pa
+        import pyarrow.types as types
+        import pyarrow.compute as pc
+
+        pa_type = arr.type
+        if not ArrowTimestampConversion._need_localization(pa_type):
+            return arr
+
+        if types.is_timestamp(pa_type) and pa_type.tz is not None:
+            # import datetime
+            # from zoneinfo import ZoneInfo
+            # ts = datetime.datetime(2022, 1, 5, 15, 0, 1, 
tzinfo=ZoneInfo('Asia/Singapore'))
+            # arr = pa.array([ts])
+            # arr[0]
+            # <pyarrow.TimestampScalar: '2022-01-05T15:00:01.000000+0800'>
+            # arr = pc.local_timestamp(arr)
+            # arr[0]
+            # <pyarrow.TimestampScalar: '2022-01-05T15:00:01.000000'>
+
+            return pc.local_timestamp(arr)
+        elif types.is_list(pa_type):
+            return pa.ListArray.from_arrays(
+                offsets=arr.offsets,
+                values=ArrowTimestampConversion.localize_tz(arr.values),
+            )
+        elif types.is_large_list(pa_type):
+            return pa.LargeListType.from_arrays(
+                offsets=arr.offsets,
+                values=ArrowTimestampConversion.localize_tz(arr.values),
+            )
+        elif types.is_fixed_size_list(pa_type):
+            return pa.FixedSizeListArray.from_arrays(
+                values=ArrowTimestampConversion.localize_tz(arr.values),
+            )
+        elif types.is_dictionary(pa_type):
+            return pa.DictionaryArray.from_arrays(
+                indices=arr.indices,
+                
dictionary=ArrowTimestampConversion.localize_tz(arr.dictionary),
+            )
+        elif types.is_map(pa_type):
+            return pa.MapArray.from_arrays(
+                offsets=arr.offsets,
+                keys=ArrowTimestampConversion.localize_tz(arr.keys),
+                items=ArrowTimestampConversion.localize_tz(arr.items),
+            )
+        elif types.is_struct(pa_type):
+            return pa.StructArray.from_arrays(
+                arrays=[
+                    ArrowTimestampConversion.localize_tz(arr.field(i)) for i 
in range(len(arr.type))
+                ],
+                names=arr.type.names,
+            )
+        else:  # pragma: no cover
+            assert False, f"Need converter for {pa_type} but failed to find 
one."
diff --git a/python/pyspark/sql/tests/test_conversion.py 
b/python/pyspark/sql/tests/test_conversion.py
index 1323652e8366..9773b2154c63 100644
--- a/python/pyspark/sql/tests/test_conversion.py
+++ b/python/pyspark/sql/tests/test_conversion.py
@@ -14,12 +14,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import datetime
 import unittest
+from zoneinfo import ZoneInfo
 
 from pyspark.errors import PySparkValueError
 from pyspark.sql.conversion import (
     ArrowTableToRowsConversion,
     LocalDataToArrowConversion,
+    ArrowTimestampConversion,
 )
 from pyspark.sql.types import (
     ArrayType,
@@ -201,6 +204,97 @@ class ConversionTests(unittest.TestCase):
             with self.assertRaises(PySparkValueError):
                 LocalDataToArrowConversion.convert([(value,)], schema, 
use_large_var_types=False)
 
+    def test_arrow_array_localize_tz(self):
+        import pyarrow as pa
+
+        tz1 = ZoneInfo("Asia/Singapore")
+        tz2 = ZoneInfo("America/Los_Angeles")
+        tz3 = ZoneInfo("UTC")
+
+        ts0 = datetime.datetime(2026, 1, 5, 15, 0, 1)
+        ts1 = datetime.datetime(2026, 1, 5, 15, 0, 1, tzinfo=tz1)
+        ts2 = datetime.datetime(2026, 1, 5, 15, 0, 1, tzinfo=tz2)
+        ts3 = datetime.datetime(2026, 1, 5, 15, 0, 1, tzinfo=tz3)
+
+        # non-timestampe types
+        for arr in [
+            pa.array([1, 2]),
+            pa.array([["x", "y"]]),
+            pa.array([[[3.0, 4.0]]]),
+            pa.StructArray.from_arrays([pa.array([1, 2]), pa.array(["x", 
"y"])], names=["a", "b"]),
+            pa.array([{1: None, 2: "x"}], type=pa.map_(pa.int32(), 
pa.string())),
+        ]:
+            output = ArrowTimestampConversion.localize_tz(arr)
+            self.assertTrue(output is arr, f"MUST not generate a new array 
{output.tolist()}")
+
+        # timestampe types
+        for arr, expected in [
+            (pa.array([ts0, None]), pa.array([ts0, None])),  # ts-ntz
+            (pa.array([ts1, None]), pa.array([ts0, None])),  # ts-ltz
+            (pa.array([[ts2, None]]), pa.array([[ts0, None]])),  # 
array<ts-ltz>
+            (pa.array([[[ts3, None]]]), pa.array([[[ts0, None]]])),  # 
array<array<ts-ltz>>
+            (
+                pa.StructArray.from_arrays(
+                    [pa.array([1, 2]), pa.array([ts0, None]), pa.array([ts1, 
None])],
+                    names=["a", "b", "c"],
+                ),
+                pa.StructArray.from_arrays(
+                    [pa.array([1, 2]), pa.array([ts0, None]), pa.array([ts0, 
None])],
+                    names=["a", "b", "c"],
+                ),
+            ),  # struct<int, ts-ntz, ts-ltz>
+            (
+                pa.StructArray.from_arrays(
+                    [pa.array([1, 2]), pa.array([[ts2], [None]])], names=["a", 
"b"]
+                ),
+                pa.StructArray.from_arrays(
+                    [pa.array([1, 2]), pa.array([[ts0], [None]])], names=["a", 
"b"]
+                ),
+            ),  # struct<int, array<ts-ltz>>
+            (
+                pa.StructArray.from_arrays(
+                    [
+                        pa.array([ts2, None]),
+                        pa.StructArray.from_arrays(
+                            [pa.array(["a", "b"]), pa.array([[ts3], [None]])], 
names=["x", "y"]
+                        ),
+                    ],
+                    names=["a", "b"],
+                ),
+                pa.StructArray.from_arrays(
+                    [
+                        pa.array([ts0, None]),
+                        pa.StructArray.from_arrays(
+                            [pa.array(["a", "b"]), pa.array([[ts0], [None]])], 
names=["x", "y"]
+                        ),
+                    ],
+                    names=["a", "b"],
+                ),
+            ),  # struct<ts-ltz, struct<str, array<ts-ltz>>>
+            (
+                pa.array(
+                    [{1: None, 2: ts1}],
+                    type=pa.map_(pa.int32(), pa.timestamp("us", tz=tz1)),
+                ),
+                pa.array(
+                    [{1: None, 2: ts0}],
+                    type=pa.map_(pa.int32(), pa.timestamp("us")),
+                ),
+            ),  # map<int, ts-ltz>
+            (
+                pa.array(
+                    [{1: [None], 2: [ts2, None]}],
+                    type=pa.map_(pa.int32(), pa.list_(pa.timestamp("us", 
tz=tz2))),
+                ),
+                pa.array(
+                    [{1: [None], 2: [ts0, None]}],
+                    type=pa.map_(pa.int32(), pa.list_(pa.timestamp("us"))),
+                ),
+            ),  # map<int, array<ts-ltz>>
+        ]:
+            output = ArrowTimestampConversion.localize_tz(arr)
+            self.assertEqual(output, expected, f"{output.tolist()} != 
{expected.tolist()}")
+
 
 if __name__ == "__main__":
     from pyspark.testing import main


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to