This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9301ab76ee [python] Fix non-compact decimal and timestamp
serialization in BinaryRow (#7608)
9301ab76ee is described below
commit 9301ab76ee9b8631a1b766c2d44c627aff319b7b
Author: Jiajia Li <[email protected]>
AuthorDate: Sun May 24 10:04:25 2026 +0800
[python] Fix non-compact decimal and timestamp serialization in BinaryRow
(#7608)
Previously python's `GenericRowSerializer` treated all decimal and
timestamp fields as compact format (value stored directly in the 8-byte
fixed slot). Java's BinaryRow uses two formats:
| Type | Compact | Non-compact |
|------|---------|-------------|
| DECIMAL | precision ≤ 18: unscaled long in fixed slot | precision >
18: big-endian bytes in variable area, `(offset << 32 \| length)` in
fixed slot |
| TIMESTAMP | precision ≤ 3: epoch millis in fixed slot | precision > 3:
epoch millis in variable area, `(offset << 32 \| nanoOfMillisecond)` in
fixed slot |
---
paimon-python/pypaimon/table/row/generic_row.py | 217 ++++++++++++---
paimon-python/pypaimon/tests/decimal_test.py | 356 ++++++++++++++++++++++++
paimon-python/pypaimon/tests/timestamp_test.py | 225 +++++++++++++++
3 files changed, 759 insertions(+), 39 deletions(-)
diff --git a/paimon-python/pypaimon/table/row/generic_row.py
b/paimon-python/pypaimon/table/row/generic_row.py
index e224fb3495..79f898d1f2 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -15,17 +15,87 @@
# specific language governing permissions and limitations
# under the License.
+import calendar
+import decimal
import struct
+from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Any, List, Union
-from dataclasses import dataclass
-
from pypaimon.schema.data_types import AtomicType, DataField, DataType
from pypaimon.table.row.binary_row import BinaryRow
-from pypaimon.table.row.internal_row import InternalRow, RowKind
from pypaimon.table.row.blob import BlobData
+from pypaimon.table.row.internal_row import InternalRow, RowKind
+
+_DECIMAL_CTX = decimal.Context(prec=100, rounding=decimal.ROUND_HALF_UP)
+
+
+def _decimal_to_unscaled_with_check(d: Decimal, precision: int, scale: int):
+ """Round decimal with HALF_UP, check precision overflow, and return
unscaled value.
+ Returns (unscaled_int, True) on overflow, (unscaled_int, False) on
success."""
+ rounded = d.quantize(Decimal(10) ** -scale, context=_DECIMAL_CTX)
+ _, digits, _ = rounded.as_tuple()
+ if rounded != 0 and len(digits) > precision:
+ return 0, True
+ return int(rounded.scaleb(scale, context=_DECIMAL_CTX)), False
+
+
+def _parse_type_precision_scale(data_type):
+ """Parse precision and scale from type string like DECIMAL(38, 10).
+
+ Falls back to Java-side defaults for parameter-less types:
+ - DECIMAL / NUMERIC -> (10, 0) (DecimalType.DEFAULT_PRECISION /
DEFAULT_SCALE)
+ - TIMESTAMP / TIMESTAMP_LTZ / TIMESTAMP WITH LOCAL TIME ZONE -> (6, 0)
+ (TimestampType.DEFAULT_PRECISION)
+ """
+ type_str = str(data_type).upper().strip()
+ if '(' in type_str and ')' in type_str:
+ try:
+ params_str = type_str.split('(', 1)[1].split(')', 1)[0]
+ parts = [p.strip() for p in params_str.split(',')]
+ precision = int(parts[0])
+ scale = int(parts[1]) if len(parts) > 1 else 0
+ return precision, scale
+ except (ValueError, IndexError):
+ pass
+ # Strip trailing NOT NULL / nullability suffixes and any parenthesised
+ # params — handles "DECIMAL NOT NULL" and malformed "DECIMAL()" alike.
+ head = type_str.split('(', 1)[0].split()[0] if type_str.strip() else ''
+ if head in ('DECIMAL', 'NUMERIC'):
+ return 10, 0
+ if head in ('TIMESTAMP', 'TIMESTAMP_LTZ'):
+ return 6, 0
+ return 0, 0
+
+
+_EPOCH = datetime(1970, 1, 1)
+
+
+def _datetime_to_millis_and_nanos(value: datetime):
+ """Convert datetime to (epoch_millis, nano_of_millisecond) without float
arithmetic.
+
+ Python's datetime is microsecond-resolution, so nano_of_millisecond is
+ always a multiple of 1000 and sub-microsecond precision is lost. For
+ TIMESTAMP(7..9) columns the lower three nano digits will always serialise
+ as zero — round-trip through pypaimon is microsecond-faithful, not
+ nanosecond-faithful.
+ """
+ epoch_seconds = calendar.timegm(value.timetuple())
+ millis = epoch_seconds * 1000 + value.microsecond // 1000
+ nano_of_millisecond = (value.microsecond % 1000) * 1000
+ return millis, nano_of_millisecond
+
+
+def _millis_nanos_to_datetime(millis: int, nano_of_millisecond: int = 0) ->
datetime:
+ """Convert (epoch_millis, nano_of_millisecond) to datetime. Nanos
truncated to micros."""
+ total_micros = millis * 1000 + nano_of_millisecond // 1000
+ seconds = total_micros // 1_000_000
+ micros = total_micros % 1_000_000
+ if micros < 0:
+ seconds -= 1
+ micros += 1_000_000
+ return _EPOCH + timedelta(seconds=seconds, microseconds=micros)
@dataclass
@@ -232,26 +302,54 @@ class GenericRowDeserializer:
return BlobData.from_bytes(binary_data)
@classmethod
- def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset:
int, data_type: DataType) -> Decimal:
- unscaled_long = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
- type_str = str(data_type)
- if '(' in type_str and ')' in type_str:
- try:
- precision_scale = type_str.split('(')[1].split(')')[0]
- if ',' in precision_scale:
- scale = int(precision_scale.split(',')[1])
- else:
- scale = 0
- except:
- scale = 0
+ def _unscaled_to_decimal(cls, unscaled_value: int, scale: int) -> Decimal:
+ sign = 0 if unscaled_value >= 0 else 1
+ digits = tuple(int(d) for d in str(abs(unscaled_value))) if
unscaled_value != 0 else (0,)
+ return Decimal((sign, digits, -scale))
+
+ @classmethod
+ def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset:
int, data_type: DataType):
+ """Parse a decimal field, returning None when the on-disk value exceeds
+ the declared precision. Mirrors Java's `BinaryRow.getDecimal()`, which
+ returns null on overflow; callers must treat the field as a regular
+ nullable cell (the surrounding GenericRow API already does this — None
+ flows out as a null column value, no further handling required).
+ """
+ precision, scale = _parse_type_precision_scale(data_type)
+ if precision <= 0:
+ raise ValueError(f"Decimal requires precision > 0, got
{precision}")
+ if precision <= 18:
+ # Compact: unscaled long in fixed part
+ unscaled_long = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
+ return cls._unscaled_to_decimal(unscaled_long, scale)
else:
- scale = 0
- return Decimal(unscaled_long) / (10 ** scale)
+ # Non-compact: (cursor << 32 | byte_length) in fixed part, bytes
in var area
+ offset_and_len = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
+ cursor = (offset_and_len >> 32) & 0xFFFFFFFF
+ byte_length = offset_and_len & 0xFFFFFFFF
+ var_offset = base_offset + cursor
+ unscaled_bytes = bytes_data[var_offset:var_offset + byte_length]
+ unscaled_value = int.from_bytes(unscaled_bytes, byteorder='big',
signed=True)
+ result = cls._unscaled_to_decimal(unscaled_value, scale)
+ _, digits, _ = result.as_tuple()
+ if result != 0 and len(digits) > precision:
+ return None
+ return result
@classmethod
def _parse_timestamp(cls, bytes_data: bytes, base_offset: int,
field_offset: int, data_type: DataType) -> datetime:
- millis = struct.unpack('<q', bytes_data[field_offset:field_offset +
8])[0]
- return datetime.fromtimestamp(millis / 1000.0, tz=None)
+ precision, _ = _parse_type_precision_scale(data_type)
+ if precision <= 3:
+ # Compact: epoch millis in fixed part
+ millis = struct.unpack('<q', bytes_data[field_offset:field_offset
+ 8])[0]
+ return _millis_nanos_to_datetime(millis)
+ else:
+ # Non-compact: (cursor << 32 | nanoOfMillisecond) in fixed part,
millis in var area
+ offset_and_nanos = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
+ nano_of_millisecond = offset_and_nanos & 0xFFFFFFFF
+ sub_offset = (offset_and_nanos >> 32) & 0xFFFFFFFF
+ millis = struct.unpack('<q', bytes_data[base_offset +
sub_offset:base_offset + sub_offset + 8])[0]
+ return _millis_nanos_to_datetime(millis, nano_of_millisecond)
@classmethod
def _parse_date(cls, bytes_data: bytes, field_offset: int) -> date:
@@ -300,9 +398,46 @@ class GenericRowSerializer:
raise ValueError(f"BinaryRow only support AtomicType yet, meet
{field.type.__class__}")
type_name = field.type.type.upper()
- if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR',
'STRING',
- 'BINARY', 'VARBINARY',
'BYTES', 'BLOB']):
- if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR',
'STRING']):
+ is_var_len_type = any(type_name.startswith(p) for p in [
+ 'CHAR', 'VARCHAR', 'STRING', 'BINARY', 'VARBINARY', 'BYTES',
'BLOB'])
+ is_decimal_type = type_name.startswith('DECIMAL') or
type_name.startswith('NUMERIC')
+ is_timestamp_type = type_name.startswith('TIMESTAMP')
+ if is_decimal_type or is_timestamp_type:
+ precision, scale = _parse_type_precision_scale(field.type)
+ else:
+ precision, scale = 0, 0
+ is_high_precision_decimal = is_decimal_type and precision > 18
+ is_non_compact_timestamp = is_timestamp_type and precision > 3
+
+ if is_decimal_type and value is not None:
+ d = value if isinstance(value, Decimal) else
Decimal(str(value))
+ unscaled_value, overflow = _decimal_to_unscaled_with_check(d,
precision, scale)
+ if overflow:
+ cls._set_null_bit(fixed_part, 0, i)
+ struct.pack_into('<q', fixed_part, field_fixed_offset, 0)
+ continue
+
+ if is_non_compact_timestamp:
+ # Non-compact: millis in var area, (offset << 32 |
nanoOfMilli) in fixed part
+ if value.tzinfo is not None:
+ raise RuntimeError("datetime tzinfo not supported yet")
+ ts_millis, nano_of_millisecond =
_datetime_to_millis_and_nanos(value)
+ var_value_bytes = struct.pack('<q', ts_millis)
+ offset_in_variable_part = current_variable_offset
+ variable_part_data.append(var_value_bytes)
+ current_variable_offset += 8
+ absolute_offset = fixed_part_size + offset_in_variable_part
+ offset_and_nano = (absolute_offset << 32) | nano_of_millisecond
+ struct.pack_into('<q', fixed_part, field_fixed_offset,
offset_and_nano)
+ elif is_var_len_type or is_high_precision_decimal:
+ if is_high_precision_decimal:
+ # Big-endian signed bytes
+ if unscaled_value == 0:
+ value_bytes = b'\x00'
+ else:
+ byte_length = (unscaled_value.bit_length() + 8) // 8
# +8 for sign bit
+ value_bytes = unscaled_value.to_bytes(byte_length,
byteorder='big', signed=True)
+ elif any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR',
'STRING']):
value_bytes = str(value).encode('utf-8')
elif type_name == 'BLOB':
value_bytes = value.to_data()
@@ -310,14 +445,18 @@ class GenericRowSerializer:
value_bytes = bytes(value)
length = len(value_bytes)
- if length <= cls.MAX_FIX_PART_DATA_SIZE:
+ if length <= cls.MAX_FIX_PART_DATA_SIZE and not
is_high_precision_decimal:
fixed_part[field_fixed_offset: field_fixed_offset +
length] = value_bytes
for j in range(length, 7):
fixed_part[field_fixed_offset + j] = 0
header_byte = 0x80 | length
fixed_part[field_fixed_offset + 7] = header_byte
else:
- var_length =
cls._round_number_of_bytes_to_nearest_word(len(value_bytes))
+ # Non-compact decimal: fixed 16 bytes; others: 8-byte
aligned
+ if is_high_precision_decimal:
+ var_length = 16
+ else:
+ var_length =
cls._round_number_of_bytes_to_nearest_word(len(value_bytes))
var_value_bytes = value_bytes + b'\x00' * (var_length -
length)
offset_in_variable_part = current_variable_offset
variable_part_data.append(var_value_bytes)
@@ -364,8 +503,18 @@ class GenericRowSerializer:
elif type_name in ['DOUBLE']:
return cls._serialize_double(value)
elif type_name.startswith('DECIMAL') or
type_name.startswith('NUMERIC'):
+ precision, _ = _parse_type_precision_scale(data_type)
+ if precision > 18:
+ raise ValueError(
+ f"Non-compact decimal (precision={precision}) must be
serialized "
+ f"via the variable-length path in to_bytes(), not
_serialize_field_value()")
return cls._serialize_decimal(value, data_type)
elif type_name.startswith('TIMESTAMP'):
+ precision = _parse_type_precision_scale(data_type)[0]
+ if precision > 3:
+ raise ValueError(
+ f"Non-compact timestamp (precision={precision}) must be
serialized "
+ f"via the variable-length path in to_bytes(), not
_serialize_field_value()")
return cls._serialize_timestamp(value)
elif type_name in ['DATE']:
return cls._serialize_date(value) + b'\x00' * 4
@@ -404,27 +553,17 @@ class GenericRowSerializer:
@classmethod
def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
- type_str = str(data_type)
- if '(' in type_str and ')' in type_str:
- try:
- precision_scale = type_str.split('(')[1].split(')')[0]
- if ',' in precision_scale:
- scale = int(precision_scale.split(',')[1])
- else:
- scale = 0
- except:
- scale = 0
- else:
- scale = 0
-
- unscaled_value = int(value * (10 ** scale))
+ """Compact decimal: unscaled long in fixed part."""
+ precision, scale = _parse_type_precision_scale(data_type)
+ d = value if isinstance(value, Decimal) else Decimal(str(value))
+ unscaled_value, _ = _decimal_to_unscaled_with_check(d, precision,
scale)
return struct.pack('<q', unscaled_value)
@classmethod
def _serialize_timestamp(cls, value: datetime) -> bytes:
if value.tzinfo is not None:
raise RuntimeError("datetime tzinfo not supported yet")
- millis = int(value.timestamp() * 1000)
+ millis, _ = _datetime_to_millis_and_nanos(value)
return struct.pack('<q', millis)
@classmethod
diff --git a/paimon-python/pypaimon/tests/decimal_test.py
b/paimon-python/pypaimon/tests/decimal_test.py
new file mode 100644
index 0000000000..fa90b3d8a2
--- /dev/null
+++ b/paimon-python/pypaimon/tests/decimal_test.py
@@ -0,0 +1,356 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import struct
+import unittest
+from decimal import Decimal
+
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
+ GenericRowSerializer,
+ _decimal_to_unscaled_with_check)
+from pypaimon.table.row.row_kind import RowKind
+
+
+class DecimalTest(unittest.TestCase):
+ """Tests for decimal serialization/deserialization in GenericRow."""
+
+ def test_decimal_compact(self):
+ """Test compact decimal (precision <= 18) round-trip."""
+ # precision=4, scale=2, unscaled=5 => 0.05
+ fields = [
+ DataField(0, "d", AtomicType("DECIMAL(4, 2)")),
+ DataField(1, "d2", AtomicType("DECIMAL(4, 2)")),
+ ]
+ row = GenericRow([Decimal("0.05"), None], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+ self.assertEqual(str(result.values[0]), "0.05")
+ self.assertIsNone(result.values[1])
+
+ # Another compact value: 0.06
+ row2 = GenericRow([Decimal("0.06"), None], fields, RowKind.INSERT)
+ serialized2 = GenericRowSerializer.to_bytes(row2)
+ result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+ self.assertEqual(str(result2.values[0]), "0.06")
+
+ def test_decimal_not_compact(self):
+ """Test non-compact decimal (precision > 18) round-trip."""
+ # precision=25, scale=5
+ fields = [
+ DataField(0, "d", AtomicType("DECIMAL(25, 5)")),
+ DataField(1, "d2", AtomicType("DECIMAL(25, 5)")),
+ ]
+ row = GenericRow([Decimal("5.55000"), None], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+ self.assertEqual(str(result.values[0]), "5.55000")
+ self.assertIsNone(result.values[1])
+
+ # Another value: 6.55
+ row2 = GenericRow([Decimal("6.55000"), None], fields, RowKind.INSERT)
+ serialized2 = GenericRowSerializer.to_bytes(row2)
+ result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+ self.assertEqual(str(result2.values[0]), "6.55000")
+
+ # Negative value
+ row3 = GenericRow([Decimal("-123.45000"), None], fields,
RowKind.INSERT)
+ serialized3 = GenericRowSerializer.to_bytes(row3)
+ result3 = GenericRowDeserializer.from_bytes(serialized3, fields)
+ self.assertEqual(str(result3.values[0]), "-123.45000")
+
+ def test_decimal_high_precision_large_value(self):
+ """Test high-precision decimal with large values that exceed long
range."""
+ fields = [DataField(0, "d", AtomicType("DECIMAL(38, 10)"))]
+
+ test_values = [
+ Decimal("12345678901234567890.1234567890"),
+ Decimal("-99999999999999999999.9999999999"),
+ Decimal("0E-10"),
+ ]
+
+ for val in test_values:
+ with self.subTest(value=val):
+ row = GenericRow([val], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], val)
+
+ def test_decimal_mixed_with_other_types(self):
+ """Test decimal fields mixed with other types in a single row."""
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ DataField(2, "compact_dec", AtomicType("DECIMAL(10, 2)")),
+ DataField(3, "high_dec", AtomicType("DECIMAL(38, 2)")),
+ DataField(4, "score", AtomicType("DOUBLE")),
+ ]
+
+ row = GenericRow(
+ [42, "test_row", Decimal("12345.67"), Decimal("12312455.22"),
3.14],
+ fields, RowKind.INSERT
+ )
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+ self.assertEqual(result.values[0], 42)
+ self.assertEqual(result.values[1], "test_row")
+ self.assertEqual(result.values[2], Decimal("12345.67"))
+ self.assertEqual(result.values[3], Decimal("12312455.22"))
+ self.assertAlmostEqual(result.values[4], 3.14)
+
+ def test_decimal_compact_binary_format(self):
+ """Verify compact decimal binary layout: unscaled long in fixed
part."""
+ fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))]
+ row = GenericRow([Decimal("0.05")], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ # Skip 4-byte arity prefix
+ data = serialized[4:]
+ null_bits_size = 8 # ((1 + 63 + 8) // 64) * 8
+ field_offset = null_bits_size
+ unscaled_long = struct.unpack('<q', data[field_offset:field_offset +
8])[0]
+ # Decimal("0.05") with scale=2 => unscaled = 5
+ self.assertEqual(unscaled_long, 5)
+
+ def test_decimal_not_compact_binary_format(self):
+ """Verify non-compact decimal binary layout: (offset << 32 | length)
in fixed part,
+ 16-byte big-endian unscaled bytes in variable part.
+ """
+ fields = [DataField(0, "d", AtomicType("DECIMAL(25, 5)"))]
+ row = GenericRow([Decimal("5.55000")], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ # Skip 4-byte arity prefix
+ data = serialized[4:]
+ null_bits_size = 8
+ field_offset = null_bits_size
+ fixed_part_size = null_bits_size + 1 * 8
+
+ offset_and_len = struct.unpack('<q', data[field_offset:field_offset +
8])[0]
+ cursor = (offset_and_len >> 32) & 0xFFFFFFFF
+ byte_length = offset_and_len & 0xFFFFFFFF
+
+ # cursor should point to the variable area (== fixed_part_size)
+ self.assertEqual(cursor, fixed_part_size)
+ # variable area should be exactly 16 bytes
+ var_area = data[cursor:]
+ self.assertEqual(len(var_area), 16)
+ # unscaled bytes are big-endian signed
+ unscaled_bytes = data[cursor:cursor + byte_length]
+ unscaled_value = int.from_bytes(unscaled_bytes, byteorder='big',
signed=True)
+ # Decimal("5.55000") with scale=5 => unscaled = 555000
+ self.assertEqual(unscaled_value, 555000)
+
+ def test_decimal_boundary_precision(self):
+ """Test boundary: DECIMAL(18, ...) is compact, DECIMAL(19, ...) is
non-compact."""
+ # precision=18: last compact
+ fields_18 = [DataField(0, "d", AtomicType("DECIMAL(18, 4)"))]
+ row_18 = GenericRow([Decimal("12345678901234.5678")], fields_18,
RowKind.INSERT)
+ s_18 = GenericRowSerializer.to_bytes(row_18)
+ r_18 = GenericRowDeserializer.from_bytes(s_18, fields_18)
+ self.assertEqual(r_18.values[0], Decimal("12345678901234.5678"))
+ # verify compact: no variable area beyond fixed part
+ data_18 = s_18[4:]
+ null_bits_size = 8
+ fixed_part_size = null_bits_size + 1 * 8
+ self.assertEqual(len(data_18), fixed_part_size)
+
+ # precision=19: first non-compact
+ fields_19 = [DataField(0, "d", AtomicType("DECIMAL(19, 4)"))]
+ row_19 = GenericRow([Decimal("12345678901234.5678")], fields_19,
RowKind.INSERT)
+ s_19 = GenericRowSerializer.to_bytes(row_19)
+ r_19 = GenericRowDeserializer.from_bytes(s_19, fields_19)
+ self.assertEqual(r_19.values[0], Decimal("12345678901234.5678"))
+ # verify non-compact: has 16-byte variable area
+ data_19 = s_19[4:]
+ self.assertEqual(len(data_19), fixed_part_size + 16)
+
+ def test_decimal_zero_different_scales(self):
+ """Test zero value with different precisions and scales."""
+ test_cases = [
+ ("DECIMAL(38, 0)", Decimal("0")),
+ ("DECIMAL(38, 10)", Decimal("0E-10")),
+ ("DECIMAL(10, 2)", Decimal("0.00")),
+ ]
+ for type_str, val in test_cases:
+ with self.subTest(type=type_str):
+ fields = [DataField(0, "d", AtomicType(type_str))]
+ row = GenericRow([val], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], val)
+
+ def test_decimal_half_up_rounding(self):
+ """Excess fractional digits should be rounded with HALF_UP."""
+ fields = [DataField(0, "d", AtomicType("DECIMAL(10, 2)"))]
+
+ test_cases = [
+ (Decimal("1.999"), Decimal("2.00")), # .999 rounds up
+ (Decimal("1.235"), Decimal("1.24")), # .235 rounds up (HALF_UP)
+ (Decimal("1.234"), Decimal("1.23")), # .234 rounds down
+ (Decimal("1.225"), Decimal("1.23")), # .225 rounds up (HALF_UP)
+ (Decimal("-1.235"), Decimal("-1.24")), # negative HALF_UP
+ ]
+ for val, expected in test_cases:
+ with self.subTest(value=val):
+ row = GenericRow([val], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], expected)
+
+ def test_decimal_precision_overflow_returns_null(self):
+ """Values exceeding declared precision should be stored as null."""
+ # DECIMAL(4, 2) can hold at most 2 integer + 2 fractional digits =>
max 99.99
+ fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))]
+
+ # 999.99 needs 5 digits total, exceeds precision=4
+ row = GenericRow([Decimal("999.99")], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertIsNone(result.values[0])
+
+ # 99.999 rounds to 100.00 (5 digits), also overflows
+ row2 = GenericRow([Decimal("99.999")], fields, RowKind.INSERT)
+ serialized2 = GenericRowSerializer.to_bytes(row2)
+ result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+ self.assertIsNone(result2.values[0])
+
+ # 99.99 fits exactly in DECIMAL(4, 2)
+ row3 = GenericRow([Decimal("99.99")], fields, RowKind.INSERT)
+ serialized3 = GenericRowSerializer.to_bytes(row3)
+ result3 = GenericRowDeserializer.from_bytes(serialized3, fields)
+ self.assertEqual(result3.values[0], Decimal("99.99"))
+
+ def test_decimal_precision_overflow_high_precision(self):
+ """Precision overflow check also works for non-compact decimals."""
+ # DECIMAL(20, 5) can hold 15 integer + 5 fractional digits
+ fields = [DataField(0, "d", AtomicType("DECIMAL(20, 5)"))]
+
+ # This value fits: 15 integer digits + 5 fractional
+ row = GenericRow([Decimal("123456789012345.12345")], fields,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], Decimal("123456789012345.12345"))
+
+ # This value overflows: 16 integer digits + 5 fractional = 21 > 20
+ row2 = GenericRow([Decimal("1234567890123456.12345")], fields,
RowKind.INSERT)
+ serialized2 = GenericRowSerializer.to_bytes(row2)
+ result2 = GenericRowDeserializer.from_bytes(serialized2, fields)
+ self.assertIsNone(result2.values[0])
+
+ def test_decimal_deserialization_precision_overflow_non_compact(self):
+ """Non-compact decimal deserialization returns None if precision
overflows."""
+ # Serialize with DECIMAL(38, 5) which fits, then deserialize as
DECIMAL(20, 5)
+ fields_wide = [DataField(0, "d", AtomicType("DECIMAL(38, 5)"))]
+ fields_narrow = [DataField(0, "d", AtomicType("DECIMAL(20, 5)"))]
+
+ # 21 digits total exceeds precision=20
+ row = GenericRow([Decimal("1234567890123456.12345")], fields_wide,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields_narrow)
+ self.assertIsNone(result.values[0])
+
+ def test_decimal_deserialization_invalid_precision(self):
+ """Deserialization with precision <= 0 raises ValueError."""
+ fields_valid = [DataField(0, "d", AtomicType("DECIMAL(10, 2)"))]
+ row = GenericRow([Decimal("1.23")], fields_valid, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ fields_bad = [DataField(0, "d", AtomicType("DECIMAL(0, 2)"))]
+ with self.assertRaises(ValueError):
+ GenericRowDeserializer.from_bytes(serialized, fields_bad)
+
+ def test_decimal_bare_defaults_to_10_0(self):
+ """Bare DECIMAL must match Java DecimalType.DEFAULT_PRECISION=10,
+ DEFAULT_SCALE=0 — compact layout, integer values round-trip."""
+ fields = [DataField(0, "d", AtomicType("DECIMAL"))]
+ row = GenericRow([Decimal("42")], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ data = serialized[4:]
+ fixed_part_size = 8 + 1 * 8
+ self.assertEqual(len(data), fixed_part_size)
+
+ unscaled_long = struct.unpack('<q', data[8:16])[0]
+ self.assertEqual(unscaled_long, 42)
+
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], Decimal("42"))
+
+ def test_decimal_bare_numeric_defaults_to_10_0(self):
+ """Bare NUMERIC aliases DECIMAL with the same default
precision/scale."""
+ fields = [DataField(0, "d", AtomicType("NUMERIC"))]
+ row = GenericRow([Decimal("123")], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], Decimal("123"))
+
+ def test_unscaled_helper_basic(self):
+ cases = [
+ (Decimal("0.05"), 4, 2, (5, False)),
+ (Decimal("-0.05"), 4, 2, (-5, False)),
+ (Decimal("0"), 4, 2, (0, False)),
+ (Decimal("0E-10"), 38, 10, (0, False)),
+ (Decimal("42"), 10, 0, (42, False)),
+ (Decimal("-42"), 10, 0, (-42, False)),
+ ]
+ for d, precision, scale, expected in cases:
+ with self.subTest(d=d, p=precision, s=scale):
+ self.assertEqual(
+ _decimal_to_unscaled_with_check(d, precision, scale),
+ expected,
+ )
+
+ def test_unscaled_helper_preserves_38_digit_precision(self):
+ unscaled, overflow = _decimal_to_unscaled_with_check(
+ Decimal("12345678901234567890.1234567890"), 38, 10)
+ self.assertFalse(overflow)
+ self.assertEqual(unscaled, 123456789012345678901234567890)
+
+ unscaled_neg, overflow_neg = _decimal_to_unscaled_with_check(
+ Decimal("-99999999999999999999.9999999999"), 38, 10)
+ self.assertFalse(overflow_neg)
+ self.assertEqual(unscaled_neg, -999999999999999999999999999999)
+
+ def test_unscaled_helper_half_up_rounding(self):
+ cases = [
+ (Decimal("1.235"), 10, 2, 124),
+ (Decimal("1.234"), 10, 2, 123),
+ (Decimal("1.225"), 10, 2, 123),
+ (Decimal("-1.235"), 10, 2, -124),
+ ]
+ for d, precision, scale, expected_unscaled in cases:
+ with self.subTest(d=d):
+ unscaled, overflow = _decimal_to_unscaled_with_check(d,
precision, scale)
+ self.assertFalse(overflow)
+ self.assertEqual(unscaled, expected_unscaled)
+
+ def test_unscaled_helper_overflow_flag(self):
+ _, overflow = _decimal_to_unscaled_with_check(Decimal("999.99"), 4, 2)
+ self.assertTrue(overflow)
+ _, overflow_round = _decimal_to_unscaled_with_check(Decimal("99.999"),
4, 2)
+ self.assertTrue(overflow_round)
+ unscaled_ok, overflow_ok =
_decimal_to_unscaled_with_check(Decimal("99.99"), 4, 2)
+ self.assertFalse(overflow_ok)
+ self.assertEqual(unscaled_ok, 9999)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/timestamp_test.py
b/paimon-python/pypaimon/tests/timestamp_test.py
new file mode 100644
index 0000000000..ef09066f90
--- /dev/null
+++ b/paimon-python/pypaimon/tests/timestamp_test.py
@@ -0,0 +1,225 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import struct
+import unittest
+from datetime import datetime
+from decimal import Decimal
+
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
+ GenericRowSerializer)
+from pypaimon.table.row.row_kind import RowKind
+
+# Java-golden epoch millis, computed from Timestamp.fromLocalDateTime:
+# millisecond = epochDay(date) * 86_400_000 + nanoOfDay(time) / 1_000_000
+# nanoOfMillisecond = nanoOfDay(time) % 1_000_000
+# These are deterministic, independent of host timezone, and match Java's
+# BinaryRowWriter wire format exactly. Do NOT compute them via any pypaimon
+# helper — the whole point is to have an external oracle.
+TS_2025_04_08_10_30_00_123456 = datetime(2025, 4, 8, 10, 30, 0, 123456)
+TS_2025_04_08_10_30_00_123456_MILLIS = 1_744_108_200_123
+TS_2025_04_08_10_30_00_123456_NANO_OF_MS = 456_000
+
+TS_2025_01_01_MILLIS = 1_735_689_600_000 # datetime(2025, 1, 1, 0, 0, 0)
+TS_1969_07_20_20_17_00_MILLIS = -14_182_980_000 # datetime(1969, 7, 20, 20,
17, 0)
+
+
+class TimestampTest(unittest.TestCase):
+ """Tests for timestamp serialization/deserialization in GenericRow.
+
+ Semantic contract: naive datetime fields are interpreted as if in UTC,
+ matching Java's Timestamp.fromLocalDateTime (Timestamp.java:195). All
+ oracles below are hand-computed from Java's algorithm, not from any
+ pypaimon helper.
+ """
+
+ def test_timestamp_compact_binary_format(self):
+ """Compact layout: raw epoch millis in fixed slot, no variable area."""
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+ row = GenericRow([TS_2025_04_08_10_30_00_123456], fields,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ data = serialized[4:]
+ null_bits_size = 8
+ fixed_part_size = null_bits_size + 1 * 8
+ self.assertEqual(len(data), fixed_part_size)
+
+ millis = struct.unpack('<q', data[null_bits_size:null_bits_size +
8])[0]
+ self.assertEqual(millis, TS_2025_04_08_10_30_00_123456_MILLIS)
+
+ def test_timestamp_compact_round_trip(self):
+ """Compact round-trip preserves the datetime down to milliseconds."""
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+ ts = datetime(2025, 4, 8, 10, 30, 0, 123000) # only millis meaningful
+ row = GenericRow([ts], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], ts)
+
+ def test_timestamp_non_compact_binary_format(self):
+ """Non-compact layout: (cursor<<32)|nanoOfMilli in fixed slot,
+ epoch millis long in variable area. Both oracles come from Java."""
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
+ row = GenericRow([TS_2025_04_08_10_30_00_123456], fields,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ data = serialized[4:]
+ null_bits_size = 8
+ fixed_part_size = null_bits_size + 1 * 8
+ self.assertEqual(len(data), fixed_part_size + 8)
+
+ offset_and_nano = struct.unpack('<q',
data[null_bits_size:null_bits_size + 8])[0]
+ cursor = (offset_and_nano >> 32) & 0xFFFFFFFF
+ nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
+ self.assertEqual(cursor, fixed_part_size)
+ self.assertEqual(nano_of_millisecond,
TS_2025_04_08_10_30_00_123456_NANO_OF_MS)
+
+ var_millis = struct.unpack('<q', data[cursor:cursor + 8])[0]
+ self.assertEqual(var_millis, TS_2025_04_08_10_30_00_123456_MILLIS)
+
+ def test_timestamp_non_compact_round_trip(self):
+ """Non-compact round-trip preserves microseconds."""
+ for type_str in ["TIMESTAMP(6)", "TIMESTAMP(9)"]:
+ with self.subTest(type=type_str):
+ fields = [DataField(0, "ts", AtomicType(type_str))]
+ row = GenericRow([TS_2025_04_08_10_30_00_123456], fields,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0],
TS_2025_04_08_10_30_00_123456)
+
+ def test_timestamp_non_compact_null(self):
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
+ row = GenericRow([None], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertIsNone(result.values[0])
+
+ def test_timestamp_boundary_precision(self):
+ """Precision 3 is last compact, precision 4 is first non-compact."""
+ fixed_part_size = 8 + 1 * 8
+
+ fields_3 = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+ s_3 = GenericRowSerializer.to_bytes(
+ GenericRow([TS_2025_04_08_10_30_00_123456], fields_3,
RowKind.INSERT))
+ self.assertEqual(len(s_3[4:]), fixed_part_size)
+
+ fields_4 = [DataField(0, "ts", AtomicType("TIMESTAMP(4)"))]
+ s_4 = GenericRowSerializer.to_bytes(
+ GenericRow([TS_2025_04_08_10_30_00_123456], fields_4,
RowKind.INSERT))
+ self.assertEqual(len(s_4[4:]), fixed_part_size + 8)
+
+ def test_timestamp_mixed_with_other_types(self):
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ DataField(2, "ts_compact", AtomicType("TIMESTAMP(3)")),
+ DataField(3, "ts_non_compact", AtomicType("TIMESTAMP(6)")),
+ DataField(4, "dec", AtomicType("DECIMAL(38, 10)")),
+ ]
+
+ ts_compact = datetime(2025, 1, 1, 0, 0, 0)
+ dec_val = Decimal("12345678901234567890.1234567890")
+
+ row = GenericRow(
+ [42, "hello", ts_compact, TS_2025_04_08_10_30_00_123456, dec_val],
+ fields, RowKind.INSERT
+ )
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+
+ self.assertEqual(result.values[0], 42)
+ self.assertEqual(result.values[1], "hello")
+ self.assertEqual(result.values[2], ts_compact)
+ self.assertEqual(result.values[3], TS_2025_04_08_10_30_00_123456)
+ self.assertEqual(result.values[4], dec_val)
+
+ def test_timestamp_default_precision_is_six(self):
+ """Bare TIMESTAMP must match Java's TimestampType.DEFAULT_PRECISION =
6,
+ i.e. use the non-compact layout and preserve microseconds."""
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP"))]
+ row = GenericRow([TS_2025_04_08_10_30_00_123456], fields,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+
+ data = serialized[4:]
+ null_bits_size = 8
+ fixed_part_size = null_bits_size + 1 * 8
+ # non-compact layout must emit an 8-byte variable area
+ self.assertEqual(len(data), fixed_part_size + 8)
+
+ offset_and_nano = struct.unpack('<q',
data[null_bits_size:null_bits_size + 8])[0]
+ nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
+ self.assertEqual(nano_of_millisecond,
TS_2025_04_08_10_30_00_123456_NANO_OF_MS)
+
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
+
+ def test_timestamp_ltz_default_precision_is_six(self):
+ """Bare TIMESTAMP_LTZ must also default to precision 6
(non-compact)."""
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP_LTZ"))]
+ row = GenericRow([TS_2025_04_08_10_30_00_123456], fields,
RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ fixed_part_size = 8 + 1 * 8
+ self.assertEqual(len(serialized[4:]), fixed_part_size + 8)
+
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
+
+ def test_timestamp_golden_2025_01_01(self):
+ """Hard-coded Java golden: 2025-01-01 00:00:00 UTC ==
1_735_689_600_000 ms.
+ This is what catches a regression into local-time semantics, since the
+ value is picked so every tz offset produces a different number."""
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+ ts = datetime(2025, 1, 1, 0, 0, 0)
+ serialized = GenericRowSerializer.to_bytes(GenericRow([ts], fields,
RowKind.INSERT))
+ data = serialized[4:]
+ millis = struct.unpack('<q', data[8:16])[0]
+ self.assertEqual(millis, TS_2025_01_01_MILLIS)
+
+ def test_timestamp_pre_epoch(self):
+ """Negative epoch millis round-trip correctly.
+
+ Golden: 1969-07-20 20:17:00 UTC == -14_182_980_000 ms
(Java-computed)."""
+ fields3 = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
+ ts_pre = datetime(1969, 7, 20, 20, 17, 0)
+ serialized = GenericRowSerializer.to_bytes(GenericRow([ts_pre],
fields3, RowKind.INSERT))
+ data = serialized[4:]
+ millis = struct.unpack('<q', data[8:16])[0]
+ self.assertEqual(millis, TS_1969_07_20_20_17_00_MILLIS)
+
+ result = GenericRowDeserializer.from_bytes(serialized, fields3)
+ self.assertEqual(result.values[0], ts_pre)
+
+ # Non-compact
+ fields6 = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
+ ts_pre_us = datetime(1969, 7, 20, 20, 17, 0, 123456)
+ row6 = GenericRow([ts_pre_us], fields6, RowKind.INSERT)
+ serialized6 = GenericRowSerializer.to_bytes(row6)
+ result6 = GenericRowDeserializer.from_bytes(serialized6, fields6)
+ self.assertEqual(result6.values[0], ts_pre_us)
+
+ def test_timestamp_ltz_non_compact(self):
+ fields = [DataField(0, "ts", AtomicType("TIMESTAMP_LTZ(6)"))]
+ ts = datetime(2025, 4, 8, 10, 30, 0, 654321)
+ row = GenericRow([ts], fields, RowKind.INSERT)
+ serialized = GenericRowSerializer.to_bytes(row)
+ result = GenericRowDeserializer.from_bytes(serialized, fields)
+ self.assertEqual(result.values[0], ts)
+
+
+if __name__ == '__main__':
+ unittest.main()