This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9301ab76ee [python] Fix non-compact decimal and timestamp 
serialization in BinaryRow (#7608)
9301ab76ee is described below

commit 9301ab76ee9b8631a1b766c2d44c627aff319b7b
Author: Jiajia Li <[email protected]>
AuthorDate: Sun May 24 10:04:25 2026 +0800

    [python] Fix non-compact decimal and timestamp serialization in BinaryRow 
(#7608)
    
    Previously python's `GenericRowSerializer` treated all decimal and
    timestamp fields as compact format (value stored directly in the 8-byte
    fixed slot). Java's BinaryRow uses two formats:
    
      | Type | Compact | Non-compact |
      |------|---------|-------------|
    | DECIMAL | precision ≤ 18: unscaled long in fixed slot | precision >
    18: big-endian bytes in variable area, `(offset << 32 \| length)` in
    fixed slot |
    | TIMESTAMP | precision ≤ 3: epoch millis in fixed slot | precision > 3:
    epoch millis in variable area, `(offset << 32 \| nanoOfMillisecond)` in
    fixed slot |
---
 paimon-python/pypaimon/table/row/generic_row.py | 217 ++++++++++++---
 paimon-python/pypaimon/tests/decimal_test.py    | 356 ++++++++++++++++++++++++
 paimon-python/pypaimon/tests/timestamp_test.py  | 225 +++++++++++++++
 3 files changed, 759 insertions(+), 39 deletions(-)

diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index e224fb3495..79f898d1f2 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -15,17 +15,87 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import calendar
+import decimal
 import struct
+from dataclasses import dataclass
 from datetime import date, datetime, time, timedelta
 from decimal import Decimal
 from typing import Any, List, Union
 
-from dataclasses import dataclass
-
 from pypaimon.schema.data_types import AtomicType, DataField, DataType
 from pypaimon.table.row.binary_row import BinaryRow
-from pypaimon.table.row.internal_row import InternalRow, RowKind
 from pypaimon.table.row.blob import BlobData
+from pypaimon.table.row.internal_row import InternalRow, RowKind
+
+_DECIMAL_CTX = decimal.Context(prec=100, rounding=decimal.ROUND_HALF_UP)
+
+
+def _decimal_to_unscaled_with_check(d: Decimal, precision: int, scale: int):
+    """Round decimal with HALF_UP, check precision overflow, and return 
unscaled value.
+    Returns (unscaled_int, True) on overflow, (unscaled_int, False) on 
success."""
+    rounded = d.quantize(Decimal(10) ** -scale, context=_DECIMAL_CTX)
+    _, digits, _ = rounded.as_tuple()
+    if rounded != 0 and len(digits) > precision:
+        return 0, True
+    return int(rounded.scaleb(scale, context=_DECIMAL_CTX)), False
+
+
+def _parse_type_precision_scale(data_type):
+    """Parse precision and scale from type string like DECIMAL(38, 10).
+
+    Falls back to Java-side defaults for parameter-less types:
+      - DECIMAL / NUMERIC -> (10, 0)  (DecimalType.DEFAULT_PRECISION / 
DEFAULT_SCALE)
+      - TIMESTAMP / TIMESTAMP_LTZ / TIMESTAMP WITH LOCAL TIME ZONE -> (6, 0)
+        (TimestampType.DEFAULT_PRECISION)
+    """
+    type_str = str(data_type).upper().strip()
+    if '(' in type_str and ')' in type_str:
+        try:
+            params_str = type_str.split('(', 1)[1].split(')', 1)[0]
+            parts = [p.strip() for p in params_str.split(',')]
+            precision = int(parts[0])
+            scale = int(parts[1]) if len(parts) > 1 else 0
+            return precision, scale
+        except (ValueError, IndexError):
+            pass
+    # Strip trailing NOT NULL / nullability suffixes and any parenthesised
+    # params — handles "DECIMAL NOT NULL" and malformed "DECIMAL()" alike.
+    head = type_str.split('(', 1)[0].split()[0] if type_str.strip() else ''
+    if head in ('DECIMAL', 'NUMERIC'):
+        return 10, 0
+    if head in ('TIMESTAMP', 'TIMESTAMP_LTZ'):
+        return 6, 0
+    return 0, 0
+
+
+_EPOCH = datetime(1970, 1, 1)
+
+
+def _datetime_to_millis_and_nanos(value: datetime):
+    """Convert datetime to (epoch_millis, nano_of_millisecond) without float 
arithmetic.
+
+    Python's datetime is microsecond-resolution, so nano_of_millisecond is
+    always a multiple of 1000 and sub-microsecond precision is lost. For
+    TIMESTAMP(7..9) columns the lower three nano digits will always serialise
+    as zero — round-trip through pypaimon is microsecond-faithful, not
+    nanosecond-faithful.
+    """
+    epoch_seconds = calendar.timegm(value.timetuple())
+    millis = epoch_seconds * 1000 + value.microsecond // 1000
+    nano_of_millisecond = (value.microsecond % 1000) * 1000
+    return millis, nano_of_millisecond
+
+
+def _millis_nanos_to_datetime(millis: int, nano_of_millisecond: int = 0) -> 
datetime:
+    """Convert (epoch_millis, nano_of_millisecond) to datetime. Nanos 
truncated to micros."""
+    total_micros = millis * 1000 + nano_of_millisecond // 1000
+    seconds = total_micros // 1_000_000
+    micros = total_micros % 1_000_000
+    if micros < 0:
+        seconds -= 1
+        micros += 1_000_000
+    return _EPOCH + timedelta(seconds=seconds, microseconds=micros)
 
 
 @dataclass
@@ -232,26 +302,54 @@ class GenericRowDeserializer:
         return BlobData.from_bytes(binary_data)
 
     @classmethod
-    def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: 
int, data_type: DataType) -> Decimal:
-        unscaled_long = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
-        type_str = str(data_type)
-        if '(' in type_str and ')' in type_str:
-            try:
-                precision_scale = type_str.split('(')[1].split(')')[0]
-                if ',' in precision_scale:
-                    scale = int(precision_scale.split(',')[1])
-                else:
-                    scale = 0
-            except:
-                scale = 0
+    def _unscaled_to_decimal(cls, unscaled_value: int, scale: int) -> Decimal:
+        sign = 0 if unscaled_value >= 0 else 1
+        digits = tuple(int(d) for d in str(abs(unscaled_value))) if 
unscaled_value != 0 else (0,)
+        return Decimal((sign, digits, -scale))
+
+    @classmethod
+    def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: 
int, data_type: DataType):
+        """Parse a decimal field, returning None when the on-disk value exceeds
+        the declared precision. Mirrors Java's `BinaryRow.getDecimal()`, which
+        returns null on overflow; callers must treat the field as a regular
+        nullable cell (the surrounding GenericRow API already does this — None
+        flows out as a null column value, no further handling required).
+        """
+        precision, scale = _parse_type_precision_scale(data_type)
+        if precision <= 0:
+            raise ValueError(f"Decimal requires precision > 0, got 
{precision}")
+        if precision <= 18:
+            # Compact: unscaled long in fixed part
+            unscaled_long = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
+            return cls._unscaled_to_decimal(unscaled_long, scale)
         else:
-            scale = 0
-        return Decimal(unscaled_long) / (10 ** scale)
+            # Non-compact: (cursor << 32 | byte_length) in fixed part, bytes 
in var area
+            offset_and_len = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
+            cursor = (offset_and_len >> 32) & 0xFFFFFFFF
+            byte_length = offset_and_len & 0xFFFFFFFF
+            var_offset = base_offset + cursor
+            unscaled_bytes = bytes_data[var_offset:var_offset + byte_length]
+            unscaled_value = int.from_bytes(unscaled_bytes, byteorder='big', 
signed=True)
+            result = cls._unscaled_to_decimal(unscaled_value, scale)
+            _, digits, _ = result.as_tuple()
+            if result != 0 and len(digits) > precision:
+                return None
+            return result
 
     @classmethod
     def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, 
field_offset: int, data_type: DataType) -> datetime:
-        millis = struct.unpack('<q', bytes_data[field_offset:field_offset + 
8])[0]
-        return datetime.fromtimestamp(millis / 1000.0, tz=None)
+        precision, _ = _parse_type_precision_scale(data_type)
+        if precision <= 3:
+            # Compact: epoch millis in fixed part
+            millis = struct.unpack('<q', bytes_data[field_offset:field_offset 
+ 8])[0]
+            return _millis_nanos_to_datetime(millis)
+        else:
+            # Non-compact: (cursor << 32 | nanoOfMillisecond) in fixed part, 
millis in var area
+            offset_and_nanos = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
+            nano_of_millisecond = offset_and_nanos & 0xFFFFFFFF
+            sub_offset = (offset_and_nanos >> 32) & 0xFFFFFFFF
+            millis = struct.unpack('<q', bytes_data[base_offset + 
sub_offset:base_offset + sub_offset + 8])[0]
+            return _millis_nanos_to_datetime(millis, nano_of_millisecond)
 
     @classmethod
     def _parse_date(cls, bytes_data: bytes, field_offset: int) -> date:
@@ -300,9 +398,46 @@ class GenericRowSerializer:
                 raise ValueError(f"BinaryRow only support AtomicType yet, meet 
{field.type.__class__}")
 
             type_name = field.type.type.upper()
-            if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 
'STRING',
-                                                     'BINARY', 'VARBINARY', 
'BYTES', 'BLOB']):
-                if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 
'STRING']):
+            is_var_len_type = any(type_name.startswith(p) for p in [
+                'CHAR', 'VARCHAR', 'STRING', 'BINARY', 'VARBINARY', 'BYTES', 
'BLOB'])
+            is_decimal_type = type_name.startswith('DECIMAL') or 
type_name.startswith('NUMERIC')
+            is_timestamp_type = type_name.startswith('TIMESTAMP')
+            if is_decimal_type or is_timestamp_type:
+                precision, scale = _parse_type_precision_scale(field.type)
+            else:
+                precision, scale = 0, 0
+            is_high_precision_decimal = is_decimal_type and precision > 18
+            is_non_compact_timestamp = is_timestamp_type and precision > 3
+
+            if is_decimal_type and value is not None:
+                d = value if isinstance(value, Decimal) else 
Decimal(str(value))
+                unscaled_value, overflow = _decimal_to_unscaled_with_check(d, 
precision, scale)
+                if overflow:
+                    cls._set_null_bit(fixed_part, 0, i)
+                    struct.pack_into('<q', fixed_part, field_fixed_offset, 0)
+                    continue
+
+            if is_non_compact_timestamp:
+                # Non-compact: millis in var area, (offset << 32 | 
nanoOfMilli) in fixed part
+                if value.tzinfo is not None:
+                    raise RuntimeError("datetime tzinfo not supported yet")
+                ts_millis, nano_of_millisecond = 
_datetime_to_millis_and_nanos(value)
+                var_value_bytes = struct.pack('<q', ts_millis)
+                offset_in_variable_part = current_variable_offset
+                variable_part_data.append(var_value_bytes)
+                current_variable_offset += 8
+                absolute_offset = fixed_part_size + offset_in_variable_part
+                offset_and_nano = (absolute_offset << 32) | nano_of_millisecond
+                struct.pack_into('<q', fixed_part, field_fixed_offset, 
offset_and_nano)
+            elif is_var_len_type or is_high_precision_decimal:
+                if is_high_precision_decimal:
+                    # Big-endian signed bytes
+                    if unscaled_value == 0:
+                        value_bytes = b'\x00'
+                    else:
+                        byte_length = (unscaled_value.bit_length() + 8) // 8  
# +8 for sign bit
+                        value_bytes = unscaled_value.to_bytes(byte_length, 
byteorder='big', signed=True)
+                elif any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 
'STRING']):
                     value_bytes = str(value).encode('utf-8')
                 elif type_name == 'BLOB':
                     value_bytes = value.to_data()
@@ -310,14 +445,18 @@ class GenericRowSerializer:
                     value_bytes = bytes(value)
 
                 length = len(value_bytes)
-                if length <= cls.MAX_FIX_PART_DATA_SIZE:
+                if length <= cls.MAX_FIX_PART_DATA_SIZE and not 
is_high_precision_decimal:
                     fixed_part[field_fixed_offset: field_fixed_offset + 
length] = value_bytes
                     for j in range(length, 7):
                         fixed_part[field_fixed_offset + j] = 0
                     header_byte = 0x80 | length
                     fixed_part[field_fixed_offset + 7] = header_byte
                 else:
-                    var_length = 
cls._round_number_of_bytes_to_nearest_word(len(value_bytes))
+                    # Non-compact decimal: fixed 16 bytes; others: 8-byte 
aligned
+                    if is_high_precision_decimal:
+                        var_length = 16
+                    else:
+                        var_length = 
cls._round_number_of_bytes_to_nearest_word(len(value_bytes))
                     var_value_bytes = value_bytes + b'\x00' * (var_length - 
length)
                     offset_in_variable_part = current_variable_offset
                     variable_part_data.append(var_value_bytes)
@@ -364,8 +503,18 @@ class GenericRowSerializer:
         elif type_name in ['DOUBLE']:
             return cls._serialize_double(value)
         elif type_name.startswith('DECIMAL') or 
type_name.startswith('NUMERIC'):
+            precision, _ = _parse_type_precision_scale(data_type)
+            if precision > 18:
+                raise ValueError(
+                    f"Non-compact decimal (precision={precision}) must be 
serialized "
+                    f"via the variable-length path in to_bytes(), not 
_serialize_field_value()")
             return cls._serialize_decimal(value, data_type)
         elif type_name.startswith('TIMESTAMP'):
+            precision = _parse_type_precision_scale(data_type)[0]
+            if precision > 3:
+                raise ValueError(
+                    f"Non-compact timestamp (precision={precision}) must be 
serialized "
+                    f"via the variable-length path in to_bytes(), not 
_serialize_field_value()")
             return cls._serialize_timestamp(value)
         elif type_name in ['DATE']:
             return cls._serialize_date(value) + b'\x00' * 4
@@ -404,27 +553,17 @@ class GenericRowSerializer:
 
     @classmethod
     def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
-        type_str = str(data_type)
-        if '(' in type_str and ')' in type_str:
-            try:
-                precision_scale = type_str.split('(')[1].split(')')[0]
-                if ',' in precision_scale:
-                    scale = int(precision_scale.split(',')[1])
-                else:
-                    scale = 0
-            except:
-                scale = 0
-        else:
-            scale = 0
-
-        unscaled_value = int(value * (10 ** scale))
+        """Compact decimal: unscaled long in fixed part."""
+        precision, scale = _parse_type_precision_scale(data_type)
+        d = value if isinstance(value, Decimal) else Decimal(str(value))
+        unscaled_value, _ = _decimal_to_unscaled_with_check(d, precision, 
scale)
         return struct.pack('<q', unscaled_value)
 
     @classmethod
     def _serialize_timestamp(cls, value: datetime) -> bytes:
         if value.tzinfo is not None:
             raise RuntimeError("datetime tzinfo not supported yet")
-        millis = int(value.timestamp() * 1000)
+        millis, _ = _datetime_to_millis_and_nanos(value)
         return struct.pack('<q', millis)
 
     @classmethod
diff --git a/paimon-python/pypaimon/tests/decimal_test.py 
b/paimon-python/pypaimon/tests/decimal_test.py
new file mode 100644
index 0000000000..fa90b3d8a2
--- /dev/null
+++ b/paimon-python/pypaimon/tests/decimal_test.py
@@ -0,0 +1,356 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import struct
+import unittest
+from decimal import Decimal
+
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
+                                            GenericRowSerializer,
+                                            _decimal_to_unscaled_with_check)
+from pypaimon.table.row.row_kind import RowKind
+
+
+class DecimalTest(unittest.TestCase):
+    """Tests for decimal serialization/deserialization in GenericRow."""
+
+    def test_decimal_compact(self):
+        """Test compact decimal (precision <= 18) round-trip."""
+        # precision=4, scale=2, unscaled=5 => 0.05
+        fields = [
+            DataField(0, "d", AtomicType("DECIMAL(4, 2)")),
+            DataField(1, "d2", AtomicType("DECIMAL(4, 2)")),
+        ]
+        row = GenericRow([Decimal("0.05"), None], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+        self.assertEqual(str(result.values[0]), "0.05")
+        self.assertIsNone(result.values[1])
+
+        # Another compact value: 0.06
+        row2 = GenericRow([Decimal("0.06"), None], fields, RowKind.INSERT)
+        serialized2 = GenericRowSerializer.to_bytes(row2)
+        result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+        self.assertEqual(str(result2.values[0]), "0.06")
+
+    def test_decimal_not_compact(self):
+        """Test non-compact decimal (precision > 18) round-trip."""
+        # precision=25, scale=5
+        fields = [
+            DataField(0, "d", AtomicType("DECIMAL(25, 5)")),
+            DataField(1, "d2", AtomicType("DECIMAL(25, 5)")),
+        ]
+        row = GenericRow([Decimal("5.55000"), None], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+        self.assertEqual(str(result.values[0]), "5.55000")
+        self.assertIsNone(result.values[1])
+
+        # Another value: 6.55
+        row2 = GenericRow([Decimal("6.55000"), None], fields, RowKind.INSERT)
+        serialized2 = GenericRowSerializer.to_bytes(row2)
+        result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+        self.assertEqual(str(result2.values[0]), "6.55000")
+
+        # Negative value
+        row3 = GenericRow([Decimal("-123.45000"), None], fields, 
RowKind.INSERT)
+        serialized3 = GenericRowSerializer.to_bytes(row3)
+        result3 = GenericRowDeserializer.from_bytes(serialized3, fields)
+        self.assertEqual(str(result3.values[0]), "-123.45000")
+
+    def test_decimal_high_precision_large_value(self):
+        """Test high-precision decimal with large values that exceed long 
range."""
+        fields = [DataField(0, "d", AtomicType("DECIMAL(38, 10)"))]
+
+        test_values = [
+            Decimal("12345678901234567890.1234567890"),
+            Decimal("-99999999999999999999.9999999999"),
+            Decimal("0E-10"),
+        ]
+
+        for val in test_values:
+            with self.subTest(value=val):
+                row = GenericRow([val], fields, RowKind.INSERT)
+                serialized = GenericRowSerializer.to_bytes(row)
+                result = GenericRowDeserializer.from_bytes(serialized, fields)
+                self.assertEqual(result.values[0], val)
+
+    def test_decimal_mixed_with_other_types(self):
+        """Test decimal fields mixed with other types in a single row."""
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+            DataField(2, "compact_dec", AtomicType("DECIMAL(10, 2)")),
+            DataField(3, "high_dec", AtomicType("DECIMAL(38, 2)")),
+            DataField(4, "score", AtomicType("DOUBLE")),
+        ]
+
+        row = GenericRow(
+            [42, "test_row", Decimal("12345.67"), Decimal("12312455.22"), 
3.14],
+            fields, RowKind.INSERT
+        )
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+        self.assertEqual(result.values[0], 42)
+        self.assertEqual(result.values[1], "test_row")
+        self.assertEqual(result.values[2], Decimal("12345.67"))
+        self.assertEqual(result.values[3], Decimal("12312455.22"))
+        self.assertAlmostEqual(result.values[4], 3.14)
+
+    def test_decimal_compact_binary_format(self):
+        """Verify compact decimal binary layout: unscaled long in fixed 
part."""
+        fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))]
+        row = GenericRow([Decimal("0.05")], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        # Skip 4-byte arity prefix
+        data = serialized[4:]
+        null_bits_size = 8  # ((1 + 63 + 8) // 64) * 8
+        field_offset = null_bits_size
+        unscaled_long = struct.unpack('<q', data[field_offset:field_offset + 
8])[0]
+        # Decimal("0.05") with scale=2 => unscaled = 5
+        self.assertEqual(unscaled_long, 5)
+
+    def test_decimal_not_compact_binary_format(self):
+        """Verify non-compact decimal binary layout: (offset << 32 | length) 
in fixed part,
+        16-byte big-endian unscaled bytes in variable part.
+        """
+        fields = [DataField(0, "d", AtomicType("DECIMAL(25, 5)"))]
+        row = GenericRow([Decimal("5.55000")], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        # Skip 4-byte arity prefix
+        data = serialized[4:]
+        null_bits_size = 8
+        field_offset = null_bits_size
+        fixed_part_size = null_bits_size + 1 * 8
+
+        offset_and_len = struct.unpack('<q', data[field_offset:field_offset + 
8])[0]
+        cursor = (offset_and_len >> 32) & 0xFFFFFFFF
+        byte_length = offset_and_len & 0xFFFFFFFF
+
+        # cursor should point to the variable area (== fixed_part_size)
+        self.assertEqual(cursor, fixed_part_size)
+        # variable area should be exactly 16 bytes
+        var_area = data[cursor:]
+        self.assertEqual(len(var_area), 16)
+        # unscaled bytes are big-endian signed
+        unscaled_bytes = data[cursor:cursor + byte_length]
+        unscaled_value = int.from_bytes(unscaled_bytes, byteorder='big', 
signed=True)
+        # Decimal("5.55000") with scale=5 => unscaled = 555000
+        self.assertEqual(unscaled_value, 555000)
+
+    def test_decimal_boundary_precision(self):
+        """Test boundary: DECIMAL(18, ...) is compact, DECIMAL(19, ...) is 
non-compact."""
+        # precision=18: last compact
+        fields_18 = [DataField(0, "d", AtomicType("DECIMAL(18, 4)"))]
+        row_18 = GenericRow([Decimal("12345678901234.5678")], fields_18, 
RowKind.INSERT)
+        s_18 = GenericRowSerializer.to_bytes(row_18)
+        r_18 = GenericRowDeserializer.from_bytes(s_18, fields_18)
+        self.assertEqual(r_18.values[0], Decimal("12345678901234.5678"))
+        # verify compact: no variable area beyond fixed part
+        data_18 = s_18[4:]
+        null_bits_size = 8
+        fixed_part_size = null_bits_size + 1 * 8
+        self.assertEqual(len(data_18), fixed_part_size)
+
+        # precision=19: first non-compact
+        fields_19 = [DataField(0, "d", AtomicType("DECIMAL(19, 4)"))]
+        row_19 = GenericRow([Decimal("12345678901234.5678")], fields_19, 
RowKind.INSERT)
+        s_19 = GenericRowSerializer.to_bytes(row_19)
+        r_19 = GenericRowDeserializer.from_bytes(s_19, fields_19)
+        self.assertEqual(r_19.values[0], Decimal("12345678901234.5678"))
+        # verify non-compact: has 16-byte variable area
+        data_19 = s_19[4:]
+        self.assertEqual(len(data_19), fixed_part_size + 16)
+
+    def test_decimal_zero_different_scales(self):
+        """Test zero value with different precisions and scales."""
+        test_cases = [
+            ("DECIMAL(38, 0)", Decimal("0")),
+            ("DECIMAL(38, 10)", Decimal("0E-10")),
+            ("DECIMAL(10, 2)", Decimal("0.00")),
+        ]
+        for type_str, val in test_cases:
+            with self.subTest(type=type_str):
+                fields = [DataField(0, "d", AtomicType(type_str))]
+                row = GenericRow([val], fields, RowKind.INSERT)
+                serialized = GenericRowSerializer.to_bytes(row)
+                result = GenericRowDeserializer.from_bytes(serialized, fields)
+                self.assertEqual(result.values[0], val)
+
+    def test_decimal_half_up_rounding(self):
+        """Excess fractional digits should be rounded with HALF_UP."""
+        fields = [DataField(0, "d", AtomicType("DECIMAL(10, 2)"))]
+
+        test_cases = [
+            (Decimal("1.999"), Decimal("2.00")),    # .999 rounds up
+            (Decimal("1.235"), Decimal("1.24")),    # .235 rounds up (HALF_UP)
+            (Decimal("1.234"), Decimal("1.23")),    # .234 rounds down
+            (Decimal("1.225"), Decimal("1.23")),    # .225 rounds up (HALF_UP)
+            (Decimal("-1.235"), Decimal("-1.24")),   # negative HALF_UP
+        ]
+        for val, expected in test_cases:
+            with self.subTest(value=val):
+                row = GenericRow([val], fields, RowKind.INSERT)
+                serialized = GenericRowSerializer.to_bytes(row)
+                result = GenericRowDeserializer.from_bytes(serialized, fields)
+                self.assertEqual(result.values[0], expected)
+
+    def test_decimal_precision_overflow_returns_null(self):
+        """Values exceeding declared precision should be stored as null."""
+        # DECIMAL(4, 2) can hold at most 2 integer + 2 fractional digits => 
max 99.99
+        fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))]
+
+        # 999.99 needs 5 digits total, exceeds precision=4
+        row = GenericRow([Decimal("999.99")], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertIsNone(result.values[0])
+
+        # 99.999 rounds to 100.00 (5 digits), also overflows
+        row2 = GenericRow([Decimal("99.999")], fields, RowKind.INSERT)
+        serialized2 = GenericRowSerializer.to_bytes(row2)
+        result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+        self.assertIsNone(result2.values[0])
+
+        # 99.99 fits exactly in DECIMAL(4, 2)
+        row3 = GenericRow([Decimal("99.99")], fields, RowKind.INSERT)
+        serialized3 = GenericRowSerializer.to_bytes(row3)
+        result3 = GenericRowDeserializer.from_bytes(serialized3, fields)
+        self.assertEqual(result3.values[0], Decimal("99.99"))
+
+    def test_decimal_precision_overflow_high_precision(self):
+        """Precision overflow check also works for non-compact decimals."""
+        # DECIMAL(20, 5) can hold 15 integer + 5 fractional digits
+        fields = [DataField(0, "d", AtomicType("DECIMAL(20, 5)"))]
+
+        # This value fits: 15 integer digits + 5 fractional
+        row = GenericRow([Decimal("123456789012345.12345")], fields, 
RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], Decimal("123456789012345.12345"))
+
+        # This value overflows: 16 integer digits + 5 fractional = 21 > 20
+        row2 = GenericRow([Decimal("1234567890123456.12345")], fields, 
RowKind.INSERT)
+        serialized2 = GenericRowSerializer.to_bytes(row2)
+        result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+        self.assertIsNone(result2.values[0])
+
+    def test_decimal_deserialization_precision_overflow_non_compact(self):
+        """Non-compact decimal deserialization returns None if precision 
overflows."""
+        # Serialize with DECIMAL(38, 5) which fits, then deserialize as 
DECIMAL(20, 5)
+        fields_wide = [DataField(0, "d", AtomicType("DECIMAL(38, 5)"))]
+        fields_narrow = [DataField(0, "d", AtomicType("DECIMAL(20, 5)"))]
+
+        # 21 digits total exceeds precision=20
+        row = GenericRow([Decimal("1234567890123456.12345")], fields_wide, 
RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields_narrow)
+        self.assertIsNone(result.values[0])
+
+    def test_decimal_deserialization_invalid_precision(self):
+        """Deserialization with precision <= 0 raises ValueError."""
+        fields_valid = [DataField(0, "d", AtomicType("DECIMAL(10, 2)"))]
+        row = GenericRow([Decimal("1.23")], fields_valid, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        fields_bad = [DataField(0, "d", AtomicType("DECIMAL(0, 2)"))]
+        with self.assertRaises(ValueError):
+            GenericRowDeserializer.from_bytes(serialized, fields_bad)
+
+    def test_decimal_bare_defaults_to_10_0(self):
+        """Bare DECIMAL must match Java DecimalType.DEFAULT_PRECISION=10,
+        DEFAULT_SCALE=0 — compact layout, integer values round-trip."""
+        fields = [DataField(0, "d", AtomicType("DECIMAL"))]
+        row = GenericRow([Decimal("42")], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        data = serialized[4:]
+        fixed_part_size = 8 + 1 * 8
+        self.assertEqual(len(data), fixed_part_size)
+
+        unscaled_long = struct.unpack('<q', data[8:16])[0]
+        self.assertEqual(unscaled_long, 42)
+
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], Decimal("42"))
+
+    def test_decimal_bare_numeric_defaults_to_10_0(self):
+        """Bare NUMERIC aliases DECIMAL with the same default 
precision/scale."""
+        fields = [DataField(0, "d", AtomicType("NUMERIC"))]
+        row = GenericRow([Decimal("123")], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], Decimal("123"))
+
+    def test_unscaled_helper_basic(self):
+        cases = [
+            (Decimal("0.05"), 4, 2, (5, False)),
+            (Decimal("-0.05"), 4, 2, (-5, False)),
+            (Decimal("0"), 4, 2, (0, False)),
+            (Decimal("0E-10"), 38, 10, (0, False)),
+            (Decimal("42"), 10, 0, (42, False)),
+            (Decimal("-42"), 10, 0, (-42, False)),
+        ]
+        for d, precision, scale, expected in cases:
+            with self.subTest(d=d, p=precision, s=scale):
+                self.assertEqual(
+                    _decimal_to_unscaled_with_check(d, precision, scale),
+                    expected,
+                )
+
+    def test_unscaled_helper_preserves_38_digit_precision(self):
+        unscaled, overflow = _decimal_to_unscaled_with_check(
+            Decimal("12345678901234567890.1234567890"), 38, 10)
+        self.assertFalse(overflow)
+        self.assertEqual(unscaled, 123456789012345678901234567890)
+
+        unscaled_neg, overflow_neg = _decimal_to_unscaled_with_check(
+            Decimal("-99999999999999999999.9999999999"), 38, 10)
+        self.assertFalse(overflow_neg)
+        self.assertEqual(unscaled_neg, -999999999999999999999999999999)
+
+    def test_unscaled_helper_half_up_rounding(self):
+        cases = [
+            (Decimal("1.235"), 10, 2, 124),
+            (Decimal("1.234"), 10, 2, 123),
+            (Decimal("1.225"), 10, 2, 123),
+            (Decimal("-1.235"), 10, 2, -124),
+        ]
+        for d, precision, scale, expected_unscaled in cases:
+            with self.subTest(d=d):
+                unscaled, overflow = _decimal_to_unscaled_with_check(d, 
precision, scale)
+                self.assertFalse(overflow)
+                self.assertEqual(unscaled, expected_unscaled)
+
+    def test_unscaled_helper_overflow_flag(self):
+        _, overflow = _decimal_to_unscaled_with_check(Decimal("999.99"), 4, 2)
+        self.assertTrue(overflow)
+        _, overflow_round = _decimal_to_unscaled_with_check(Decimal("99.999"), 
4, 2)
+        self.assertTrue(overflow_round)
+        unscaled_ok, overflow_ok = 
_decimal_to_unscaled_with_check(Decimal("99.99"), 4, 2)
+        self.assertFalse(overflow_ok)
+        self.assertEqual(unscaled_ok, 9999)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/timestamp_test.py 
b/paimon-python/pypaimon/tests/timestamp_test.py
new file mode 100644
index 0000000000..ef09066f90
--- /dev/null
+++ b/paimon-python/pypaimon/tests/timestamp_test.py
@@ -0,0 +1,225 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import struct
+import unittest
+from datetime import datetime
+from decimal import Decimal
+
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
+                                            GenericRowSerializer)
+from pypaimon.table.row.row_kind import RowKind
+
+# Java-golden epoch millis, computed from Timestamp.fromLocalDateTime:
+#   millisecond = epochDay(date) * 86_400_000 + nanoOfDay(time) / 1_000_000
+#   nanoOfMillisecond = nanoOfDay(time) % 1_000_000
+# These are deterministic, independent of host timezone, and match Java's
+# BinaryRowWriter wire format exactly. Do NOT compute them via any pypaimon
+# helper — the whole point is to have an external oracle.
+TS_2025_04_08_10_30_00_123456 = datetime(2025, 4, 8, 10, 30, 0, 123456)
+TS_2025_04_08_10_30_00_123456_MILLIS = 1_744_108_200_123
+TS_2025_04_08_10_30_00_123456_NANO_OF_MS = 456_000
+
+TS_2025_01_01_MILLIS = 1_735_689_600_000  # datetime(2025, 1, 1, 0, 0, 0)
+TS_1969_07_20_20_17_00_MILLIS = -14_182_980_000  # datetime(1969, 7, 20, 20, 
17, 0)
+
+
+class TimestampTest(unittest.TestCase):
+    """Tests for timestamp serialization/deserialization in GenericRow.
+
+    Semantic contract: naive datetime fields are interpreted as if in UTC,
+    matching Java's Timestamp.fromLocalDateTime (Timestamp.java:195). All
+    oracles below are hand-computed from Java's algorithm, not from any
+    pypaimon helper.
+    """
+
+    def test_timestamp_compact_binary_format(self):
+        """Compact layout: raw epoch millis in fixed slot, no variable area."""
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+        row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, 
RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        data = serialized[4:]
+        null_bits_size = 8
+        fixed_part_size = null_bits_size + 1 * 8
+        self.assertEqual(len(data), fixed_part_size)
+
+        millis = struct.unpack('<q', data[null_bits_size:null_bits_size + 
8])[0]
+        self.assertEqual(millis, TS_2025_04_08_10_30_00_123456_MILLIS)
+
+    def test_timestamp_compact_round_trip(self):
+        """Compact round-trip preserves the datetime down to milliseconds."""
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+        ts = datetime(2025, 4, 8, 10, 30, 0, 123000)  # only millis meaningful
+        row = GenericRow([ts], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], ts)
+
+    def test_timestamp_non_compact_binary_format(self):
+        """Non-compact layout: (cursor<<32)|nanoOfMilli in fixed slot,
+        epoch millis long in variable area. Both oracles come from Java."""
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
+        row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, 
RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        data = serialized[4:]
+        null_bits_size = 8
+        fixed_part_size = null_bits_size + 1 * 8
+        self.assertEqual(len(data), fixed_part_size + 8)
+
+        offset_and_nano = struct.unpack('<q', 
data[null_bits_size:null_bits_size + 8])[0]
+        cursor = (offset_and_nano >> 32) & 0xFFFFFFFF
+        nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
+        self.assertEqual(cursor, fixed_part_size)
+        self.assertEqual(nano_of_millisecond, 
TS_2025_04_08_10_30_00_123456_NANO_OF_MS)
+
+        var_millis = struct.unpack('<q', data[cursor:cursor + 8])[0]
+        self.assertEqual(var_millis, TS_2025_04_08_10_30_00_123456_MILLIS)
+
+    def test_timestamp_non_compact_round_trip(self):
+        """Non-compact round-trip preserves microseconds."""
+        for type_str in ["TIMESTAMP(6)", "TIMESTAMP(9)"]:
+            with self.subTest(type=type_str):
+                fields = [DataField(0, "ts", AtomicType(type_str))]
+                row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, 
RowKind.INSERT)
+                serialized = GenericRowSerializer.to_bytes(row)
+                result = GenericRowDeserializer.from_bytes(serialized, fields)
+                self.assertEqual(result.values[0], 
TS_2025_04_08_10_30_00_123456)
+
+    def test_timestamp_non_compact_null(self):
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
+        row = GenericRow([None], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertIsNone(result.values[0])
+
+    def test_timestamp_boundary_precision(self):
+        """Precision 3 is last compact, precision 4 is first non-compact."""
+        fixed_part_size = 8 + 1 * 8
+
+        fields_3 = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+        s_3 = GenericRowSerializer.to_bytes(
+            GenericRow([TS_2025_04_08_10_30_00_123456], fields_3, 
RowKind.INSERT))
+        self.assertEqual(len(s_3[4:]), fixed_part_size)
+
+        fields_4 = [DataField(0, "ts", AtomicType("TIMESTAMP(4)"))]
+        s_4 = GenericRowSerializer.to_bytes(
+            GenericRow([TS_2025_04_08_10_30_00_123456], fields_4, 
RowKind.INSERT))
+        self.assertEqual(len(s_4[4:]), fixed_part_size + 8)
+
+    def test_timestamp_mixed_with_other_types(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+            DataField(2, "ts_compact", AtomicType("TIMESTAMP(3)")),
+            DataField(3, "ts_non_compact", AtomicType("TIMESTAMP(6)")),
+            DataField(4, "dec", AtomicType("DECIMAL(38, 10)")),
+        ]
+
+        ts_compact = datetime(2025, 1, 1, 0, 0, 0)
+        dec_val = Decimal("12345678901234567890.1234567890")
+
+        row = GenericRow(
+            [42, "hello", ts_compact, TS_2025_04_08_10_30_00_123456, dec_val],
+            fields, RowKind.INSERT
+        )
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+        self.assertEqual(result.values[0], 42)
+        self.assertEqual(result.values[1], "hello")
+        self.assertEqual(result.values[2], ts_compact)
+        self.assertEqual(result.values[3], TS_2025_04_08_10_30_00_123456)
+        self.assertEqual(result.values[4], dec_val)
+
+    def test_timestamp_default_precision_is_six(self):
+        """Bare TIMESTAMP must match Java's TimestampType.DEFAULT_PRECISION = 
6,
+        i.e. use the non-compact layout and preserve microseconds."""
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP"))]
+        row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, 
RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+
+        data = serialized[4:]
+        null_bits_size = 8
+        fixed_part_size = null_bits_size + 1 * 8
+        # non-compact layout must emit an 8-byte variable area
+        self.assertEqual(len(data), fixed_part_size + 8)
+
+        offset_and_nano = struct.unpack('<q', 
data[null_bits_size:null_bits_size + 8])[0]
+        nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
+        self.assertEqual(nano_of_millisecond, 
TS_2025_04_08_10_30_00_123456_NANO_OF_MS)
+
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
+
+    def test_timestamp_ltz_default_precision_is_six(self):
+        """Bare TIMESTAMP_LTZ must also default to precision 6 
(non-compact)."""
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP_LTZ"))]
+        row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, 
RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        fixed_part_size = 8 + 1 * 8
+        self.assertEqual(len(serialized[4:]), fixed_part_size + 8)
+
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
+
+    def test_timestamp_golden_2025_01_01(self):
+        """Hard-coded Java golden: 2025-01-01 00:00:00 UTC == 
1_735_689_600_000 ms.
+        This is what catches a regression into local-time semantics, since the
+        value is picked so every tz offset produces a different number."""
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+        ts = datetime(2025, 1, 1, 0, 0, 0)
+        serialized = GenericRowSerializer.to_bytes(GenericRow([ts], fields, 
RowKind.INSERT))
+        data = serialized[4:]
+        millis = struct.unpack('<q', data[8:16])[0]
+        self.assertEqual(millis, TS_2025_01_01_MILLIS)
+
+    def test_timestamp_pre_epoch(self):
+        """Negative epoch millis round-trip correctly.
+
+        Golden: 1969-07-20 20:17:00 UTC == -14_182_980_000 ms 
(Java-computed)."""
+        fields3 = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+        ts_pre = datetime(1969, 7, 20, 20, 17, 0)
+        serialized = GenericRowSerializer.to_bytes(GenericRow([ts_pre], 
fields3, RowKind.INSERT))
+        data = serialized[4:]
+        millis = struct.unpack('<q', data[8:16])[0]
+        self.assertEqual(millis, TS_1969_07_20_20_17_00_MILLIS)
+
+        result = GenericRowDeserializer.from_bytes(serialized, fields3)
+        self.assertEqual(result.values[0], ts_pre)
+
+        # Non-compact
+        fields6 = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
+        ts_pre_us = datetime(1969, 7, 20, 20, 17, 0, 123456)
+        row6 = GenericRow([ts_pre_us], fields6, RowKind.INSERT)
+        serialized6 = GenericRowSerializer.to_bytes(row6)
+        result6 = GenericRowDeserializer.from_bytes(serialized6, fields6)
+        self.assertEqual(result6.values[0], ts_pre_us)
+
+    def test_timestamp_ltz_non_compact(self):
+        fields = [DataField(0, "ts", AtomicType("TIMESTAMP_LTZ(6)"))]
+        ts = datetime(2025, 4, 8, 10, 30, 0, 654321)
+        row = GenericRow([ts], fields, RowKind.INSERT)
+        serialized = GenericRowSerializer.to_bytes(row)
+        result = GenericRowDeserializer.from_bytes(serialized, fields)
+        self.assertEqual(result.values[0], ts)
+
+
+if __name__ == '__main__':
+    unittest.main()


Reply via email to