This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cce1db3fe8 [python] fix pypaimin timestamp non free time zone issue  
(#6750)
cce1db3fe8 is described below

commit cce1db3fe82998316a3be4b29f25f3a70ec6e67e
Author: XiaoHongbo <[email protected]>
AuthorDate: Sat Dec 6 19:50:26 2025 +0800

    [python] fix pypaimin timestamp non free time zone issue  (#6750)
---
 .../apache/paimon/manifest/ManifestFileTest.java   |  34 +++++
 .../paimon/table/system/ManifestsTableTest.java    |  46 ++++++
 paimon-python/pypaimon/data/__init__.py            |  21 +++
 paimon-python/pypaimon/data/timestamp.py           | 167 +++++++++++++++++++++
 .../pypaimon/manifest/manifest_file_manager.py     |  22 ++-
 .../pypaimon/manifest/schema/data_file_meta.py     |  74 ++++++++-
 .../pypaimon/tests/file_store_commit_test.py       | 122 ++++++++-------
 .../manifest/manifest_entry_identifier_test.py     |   3 +-
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |  19 ++-
 paimon-python/pypaimon/tests/reader_base_test.py   |  19 ++-
 .../pypaimon/tests/reader_primary_key_test.py      |  43 ++++++
 paimon-python/pypaimon/write/file_store_commit.py  |   4 +-
 paimon-python/pypaimon/write/writer/blob_writer.py |   6 +-
 .../pypaimon/write/writer/data_blob_writer.py      |   6 +-
 paimon-python/pypaimon/write/writer/data_writer.py |   6 +-
 15 files changed, 514 insertions(+), 78 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 142ebc6a47..2e45d904a5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.FileStorePathFactory;
 
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
@@ -86,6 +87,39 @@ public class ManifestFileTest {
         }
     }
 
+    @Test
+    void testManifestCreationTimeTimestamp() {
+        List<ManifestEntry> entries = generateData();
+        ManifestFile manifestFile = createManifestFile(tempDir.toString());
+
+        List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
+        List<ManifestEntry> actualEntries =
+                actualMetas.stream()
+                        .flatMap(m -> manifestFile.read(m.fileName(), 
m.fileSize()).stream())
+                        .collect(Collectors.toList());
+
+        int creationTimesFound = 0;
+        for (ManifestEntry entry : actualEntries) {
+            if (entry.file().creationTime() != null) {
+                creationTimesFound++;
+                org.apache.paimon.data.Timestamp creationTime = 
entry.file().creationTime();
+                assertThat(creationTime).isNotNull();
+                long epochMillis = entry.file().creationTimeEpochMillis();
+                assertThat(epochMillis).isPositive();
+                long expectedEpochMillis = creationTime.getMillisecond();
+                java.time.ZoneId systemZone = java.time.ZoneId.systemDefault();
+                java.time.ZoneOffset offset =
+                        systemZone
+                                .getRules()
+                                
.getOffset(java.time.Instant.ofEpochMilli(expectedEpochMillis));
+                expectedEpochMillis = expectedEpochMillis - 
(offset.getTotalSeconds() * 1000L);
+                assertThat(epochMillis).isEqualTo(expectedEpochMillis);
+            }
+        }
+
+        assertThat(creationTimesFound).isPositive();
+    }
+
     private List<ManifestEntry> generateData() {
         List<ManifestEntry> entries = new ArrayList<>();
         for (int i = 0; i < 100; i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index c614ca2c27..7afd079b0d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -28,10 +28,13 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
@@ -161,6 +164,49 @@ public class ManifestsTableTest extends TableTestBase {
                 "Specified parameter scan.snapshot-id = 3 is not exist, you 
can set it in range from 1 to 2");
     }
 
+    @Test
+    void testManifestCreationTimeTimestamp() throws Exception {
+        Identifier identifier = identifier("T_CreationTime");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option("bucket", "1")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table testTable = catalog.getTable(identifier);
+
+        write(testTable, GenericRow.of(1, 1, 1), GenericRow.of(2, 2, 2));
+
+        FileStoreScan scan = ((FileStoreTable) testTable).store().newScan();
+        FileStoreScan.Plan plan = scan.plan();
+        List<ManifestEntry> entries = plan.files();
+
+        int creationTimesFound = 0;
+        for (org.apache.paimon.manifest.ManifestEntry entry : entries) {
+            if (entry.file().creationTime() != null) {
+                creationTimesFound++;
+                org.apache.paimon.data.Timestamp creationTime = 
entry.file().creationTime();
+                assertThat(creationTime).isNotNull();
+                long epochMillis = entry.file().creationTimeEpochMillis();
+                assertThat(epochMillis).isPositive();
+                long expectedEpochMillis = creationTime.getMillisecond();
+                java.time.ZoneId systemZone = java.time.ZoneId.systemDefault();
+                java.time.ZoneOffset offset =
+                        systemZone
+                                .getRules()
+                                
.getOffset(java.time.Instant.ofEpochMilli(expectedEpochMillis));
+                expectedEpochMillis = expectedEpochMillis - 
(offset.getTotalSeconds() * 1000L);
+                assertThat(epochMillis).isEqualTo(expectedEpochMillis);
+            }
+        }
+
+        assertThat(creationTimesFound).isPositive();
+    }
+
     private List<InternalRow> getExpectedResult(long snapshotId) {
         if (!snapshotManager.snapshotExists(snapshotId)) {
             return Collections.emptyList();
diff --git a/paimon-python/pypaimon/data/__init__.py 
b/paimon-python/pypaimon/data/__init__.py
new file mode 100644
index 0000000000..9f453647bb
--- /dev/null
+++ b/paimon-python/pypaimon/data/__init__.py
@@ -0,0 +1,21 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.data.timestamp import Timestamp
+
+__all__ = ['Timestamp']
diff --git a/paimon-python/pypaimon/data/timestamp.py 
b/paimon-python/pypaimon/data/timestamp.py
new file mode 100644
index 0000000000..ace4c1e1c6
--- /dev/null
+++ b/paimon-python/pypaimon/data/timestamp.py
@@ -0,0 +1,167 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from datetime import datetime, timedelta
+
+
+class Timestamp:
+    """
+    An internal data structure representing data of TimestampType.
+
+    This data structure is immutable and consists of a milliseconds and 
nanos-of-millisecond since
+    1970-01-01 00:00:00. It might be stored in a compact representation (as a 
long value) if
+    values are small enough.
+
+    This class represents timezone-free timestamps
+    """
+
+    # the number of milliseconds in a day
+    MILLIS_PER_DAY = 86400000  # = 24 * 60 * 60 * 1000
+
+    MICROS_PER_MILLIS = 1000
+    NANOS_PER_MICROS = 1000
+
+    NANOS_PER_HOUR = 3_600_000_000_000
+    NANOS_PER_MINUTE = 60_000_000_000
+    NANOS_PER_SECOND = 1_000_000_000
+    NANOS_PER_MICROSECOND = 1_000
+
+    def __init__(self, millisecond: int, nano_of_millisecond: int = 0):
+        if not (0 <= nano_of_millisecond <= 999_999):
+            raise ValueError(
+                f"nano_of_millisecond must be between 0 and 999,999, got 
{nano_of_millisecond}"
+            )
+        self._millisecond = millisecond
+        self._nano_of_millisecond = nano_of_millisecond
+
+    def get_millisecond(self) -> int:
+        """Returns the number of milliseconds since 1970-01-01 00:00:00."""
+        return self._millisecond
+
+    def get_nano_of_millisecond(self) -> int:
+        """
+        Returns the number of nanoseconds (the nanoseconds within the 
milliseconds).
+        The value range is from 0 to 999,999.
+        """
+        return self._nano_of_millisecond
+
+    def to_local_date_time(self) -> datetime:
+        """Converts this Timestamp object to a datetime (timezone-free)."""
+        epoch = datetime(1970, 1, 1)
+        days = self._millisecond // self.MILLIS_PER_DAY
+        time_millis = self._millisecond % self.MILLIS_PER_DAY
+        if time_millis < 0:
+            days -= 1
+            time_millis += self.MILLIS_PER_DAY
+
+        microseconds = time_millis * 1000 + self._nano_of_millisecond // 1000
+        return epoch + timedelta(days=days, microseconds=microseconds)
+
+    def to_millis_timestamp(self) -> 'Timestamp':
+        return Timestamp.from_epoch_millis(self._millisecond)
+
+    def to_micros(self) -> int:
+        """Converts this Timestamp object to micros."""
+        micros = self._millisecond * self.MICROS_PER_MILLIS
+        return micros + self._nano_of_millisecond // self.NANOS_PER_MICROS
+
+    def __eq__(self, other):
+        if not isinstance(other, Timestamp):
+            return False
+        return (self._millisecond == other._millisecond and
+                self._nano_of_millisecond == other._nano_of_millisecond)
+
+    def __lt__(self, other):
+        if not isinstance(other, Timestamp):
+            return NotImplemented
+        if self._millisecond != other._millisecond:
+            return self._millisecond < other._millisecond
+        return self._nano_of_millisecond < other._nano_of_millisecond
+
+    def __le__(self, other):
+        return self == other or self < other
+
+    def __gt__(self, other):
+        if not isinstance(other, Timestamp):
+            return NotImplemented
+        if self._millisecond != other._millisecond:
+            return self._millisecond > other._millisecond
+        return self._nano_of_millisecond > other._nano_of_millisecond
+
+    def __ge__(self, other):
+        return self == other or self > other
+
+    def __hash__(self):
+        return hash((self._millisecond, self._nano_of_millisecond))
+
+    def __repr__(self):
+        return f"Timestamp(millisecond={self._millisecond}, 
nano_of_millisecond={self._nano_of_millisecond})"
+
+    def __str__(self):
+        return self.to_local_date_time().strftime("%Y-%m-%d %H:%M:%S.%f")
+
+    @staticmethod
+    def now() -> 'Timestamp':
+        """Creates an instance of Timestamp for now."""
+        return Timestamp.from_local_date_time(datetime.now())
+
+    @staticmethod
+    def from_epoch_millis(milliseconds: int, nanos_of_millisecond: int = 0) -> 
'Timestamp':
+        """
+        Creates an instance of Timestamp from milliseconds.
+        Args:
+            milliseconds: the number of milliseconds since 1970-01-01 00:00:00
+            nanos_of_millisecond: the nanoseconds within the millisecond, from 
0 to 999,999
+        """
+        return Timestamp(milliseconds, nanos_of_millisecond)
+
+    @staticmethod
+    def from_local_date_time(date_time: datetime) -> 'Timestamp':
+        """
+        Creates an instance of Timestamp from a datetime (timezone-free).
+
+        Args:
+            date_time: a datetime object (should be naive, without timezone)
+        """
+        if date_time.tzinfo is not None:
+            raise ValueError("datetime must be naive (no timezone)")
+
+        epoch_date = datetime(1970, 1, 1).date()
+        date_time_date = date_time.date()
+
+        epoch_day = (date_time_date - epoch_date).days
+        time_part = date_time.time()
+
+        nano_of_day = (
+            time_part.hour * Timestamp.NANOS_PER_HOUR
+            + time_part.minute * Timestamp.NANOS_PER_MINUTE
+            + time_part.second * Timestamp.NANOS_PER_SECOND
+            + time_part.microsecond * Timestamp.NANOS_PER_MICROSECOND
+        )
+
+        millisecond = epoch_day * Timestamp.MILLIS_PER_DAY + nano_of_day // 
1_000_000
+        nano_of_millisecond = int(nano_of_day % 1_000_000)
+
+        return Timestamp(millisecond, nano_of_millisecond)
+
+    @staticmethod
+    def from_micros(micros: int) -> 'Timestamp':
+        """Creates an instance of Timestamp from micros."""
+        mills = micros // Timestamp.MICROS_PER_MILLIS
+        nanos = (micros - mills * Timestamp.MICROS_PER_MILLIS) * 
Timestamp.NANOS_PER_MICROS
+        return Timestamp.from_epoch_millis(mills, int(nanos))
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 5aefd122b0..f6ae41e3d3 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -21,6 +21,8 @@ from typing import List
 
 import fastavro
 
+from datetime import datetime
+
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
@@ -105,6 +107,22 @@ class ManifestFileManager:
                 max_values=BinaryRow(value_dict['_MAX_VALUES'], fields),
                 null_counts=value_dict['_NULL_COUNTS'],
             )
+            # fastavro returns UTC-aware datetime for timestamp-millis, we 
need to convert properly
+            from pypaimon.data.timestamp import Timestamp
+            creation_time_value = file_dict['_CREATION_TIME']
+            creation_time_ts = None
+            if creation_time_value is not None:
+                if isinstance(creation_time_value, datetime):
+                    if creation_time_value.tzinfo:
+                        epoch_millis = int(creation_time_value.timestamp() * 
1000)
+                        creation_time_ts = 
Timestamp.from_epoch_millis(epoch_millis)
+                    else:
+                        creation_time_ts = 
Timestamp.from_local_date_time(creation_time_value)
+                elif isinstance(creation_time_value, (int, float)):
+                    creation_time_ts = 
Timestamp.from_epoch_millis(int(creation_time_value))
+                else:
+                    raise ValueError(f"Unexpected creation_time type: 
{type(creation_time_value)}")
+
             file_meta = DataFileMeta(
                 file_name=file_dict['_FILE_NAME'],
                 file_size=file_dict['_FILE_SIZE'],
@@ -118,7 +136,7 @@ class ManifestFileManager:
                 schema_id=file_dict['_SCHEMA_ID'],
                 level=file_dict['_LEVEL'],
                 extra_files=file_dict['_EXTRA_FILES'],
-                creation_time=file_dict['_CREATION_TIME'],
+                creation_time=creation_time_ts,
                 delete_row_count=file_dict['_DELETE_ROW_COUNT'],
                 embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
                 file_source=file_dict['_FILE_SOURCE'],
@@ -187,7 +205,7 @@ class ManifestFileManager:
                     "_SCHEMA_ID": entry.file.schema_id,
                     "_LEVEL": entry.file.level,
                     "_EXTRA_FILES": entry.file.extra_files,
-                    "_CREATION_TIME": entry.file.creation_time,
+                    "_CREATION_TIME": 
entry.file.creation_time.get_millisecond() if entry.file.creation_time else 
None,
                     "_DELETE_ROW_COUNT": entry.file.delete_row_count,
                     "_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
                     "_FILE_SOURCE": entry.file.file_source,
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index d7942d1e9f..239c63dbba 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -19,7 +19,9 @@
 from dataclasses import dataclass
 from datetime import datetime
 from typing import List, Optional
+import time
 
+from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA, 
VALUE_STATS_SCHEMA,
                                                    SimpleStats)
 from pypaimon.table.row.generic_row import GenericRow
@@ -40,7 +42,7 @@ class DataFileMeta:
     level: int
     extra_files: List[str]
 
-    creation_time: Optional[datetime] = None
+    creation_time: Optional[Timestamp] = None
     delete_row_count: Optional[int] = None
     embedded_index: Optional[bytes] = None
     file_source: Optional[int] = None
@@ -52,6 +54,76 @@ class DataFileMeta:
     # not a schema field, just for internal usage
     file_path: str = None
 
+    def get_creation_time(self) -> Optional[Timestamp]:
+        return self.creation_time
+
+    def creation_time_epoch_millis(self) -> Optional[int]:
+        if self.creation_time is None:
+            return None
+        local_dt = self.creation_time.to_local_date_time()
+        local_time_struct = local_dt.timetuple()
+        local_timestamp = time.mktime(local_time_struct)
+        utc_timestamp = time.mktime(time.gmtime(local_timestamp))
+        tz_offset_seconds = int(local_timestamp - utc_timestamp)
+        return int((local_timestamp - tz_offset_seconds) * 1000)
+
+    def creation_time_as_datetime(self) -> Optional[datetime]:
+        if self.creation_time is None:
+            return None
+        return self.creation_time.to_local_date_time()
+
+    @classmethod
+    def create(
+        cls,
+        file_name: str,
+        file_size: int,
+        row_count: int,
+        min_key: GenericRow,
+        max_key: GenericRow,
+        key_stats: SimpleStats,
+        value_stats: SimpleStats,
+        min_sequence_number: int,
+        max_sequence_number: int,
+        schema_id: int,
+        level: int,
+        extra_files: List[str],
+        creation_time: Optional[Timestamp] = None,
+        delete_row_count: Optional[int] = None,
+        embedded_index: Optional[bytes] = None,
+        file_source: Optional[int] = None,
+        value_stats_cols: Optional[List[str]] = None,
+        external_path: Optional[str] = None,
+        first_row_id: Optional[int] = None,
+        write_cols: Optional[List[str]] = None,
+        file_path: Optional[str] = None,
+    ) -> 'DataFileMeta':
+        if creation_time is None:
+            creation_time = Timestamp.now()
+
+        return cls(
+            file_name=file_name,
+            file_size=file_size,
+            row_count=row_count,
+            min_key=min_key,
+            max_key=max_key,
+            key_stats=key_stats,
+            value_stats=value_stats,
+            min_sequence_number=min_sequence_number,
+            max_sequence_number=max_sequence_number,
+            schema_id=schema_id,
+            level=level,
+            extra_files=extra_files,
+            creation_time=creation_time,
+            delete_row_count=delete_row_count,
+            embedded_index=embedded_index,
+            file_source=file_source,
+            value_stats_cols=value_stats_cols,
+            external_path=external_path,
+            first_row_id=first_row_id,
+            write_cols=write_cols,
+            file_path=file_path,
+        )
+
     def set_file_path(self, table_path: str, partition: GenericRow, bucket: 
int):
         path_builder = table_path.rstrip('/')
         partition_dict = partition.to_dict()
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py 
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index f1fe4de3d2..438ff6aeb8 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -61,20 +61,24 @@ class TestFileStoreCommit(unittest.TestCase):
         file_store_commit = self._create_file_store_commit()
 
         # Create test data
-        creation_time = datetime(2024, 1, 15, 10, 30, 0)
-        file_meta = DataFileMeta(
+        creation_time_dt = datetime(2024, 1, 15, 10, 30, 0)
+        from pypaimon.data.timestamp import Timestamp
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.manifest.schema.simple_stats import SimpleStats
+        creation_time = Timestamp.from_local_date_time(creation_time_dt)
+        file_meta = DataFileMeta.create(
             file_name="test_file_1.parquet",
             file_size=1024 * 1024,  # 1MB
             row_count=10000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=1,
             max_sequence_number=100,
             schema_id=0,
             level=0,
-            extra_files=None,
+            extra_files=[],
             creation_time=creation_time,
             external_path=None,
             first_row_id=None,
@@ -99,7 +103,8 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat.record_count, 10000)
         self.assertEqual(stat.file_count, 1)
         self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
-        self.assertEqual(stat.last_file_creation_time, 
int(creation_time.timestamp() * 1000))
+        expected_time = file_meta.creation_time_epoch_millis()
+        self.assertEqual(stat.last_file_creation_time, expected_time)
 
     def test_generate_partition_statistics_multiple_files_same_partition(
             self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
@@ -107,38 +112,41 @@ class TestFileStoreCommit(unittest.TestCase):
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
 
-        creation_time_1 = datetime(2024, 1, 15, 10, 30, 0)
-        creation_time_2 = datetime(2024, 1, 15, 11, 30, 0)  # Later time
+        from pypaimon.data.timestamp import Timestamp
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.manifest.schema.simple_stats import SimpleStats
+        creation_time_1 = Timestamp.from_local_date_time(datetime(2024, 1, 15, 
10, 30, 0))
+        creation_time_2 = Timestamp.from_local_date_time(datetime(2024, 1, 15, 
11, 30, 0))  # Later time
 
-        file_meta_1 = DataFileMeta(
+        file_meta_1 = DataFileMeta.create(
             file_name="test_file_1.parquet",
             file_size=1024 * 1024,  # 1MB
             row_count=10000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=1,
             max_sequence_number=100,
             schema_id=0,
             level=0,
-            extra_files=None,
+            extra_files=[],
             creation_time=creation_time_1
         )
 
-        file_meta_2 = DataFileMeta(
+        file_meta_2 = DataFileMeta.create(
             file_name="test_file_2.parquet",
             file_size=2 * 1024 * 1024,  # 2MB
             row_count=15000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=101,
             max_sequence_number=200,
             schema_id=0,
             level=0,
-            extra_files=None,
+            extra_files=[],
             creation_time=creation_time_2
         )
 
@@ -159,8 +167,8 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat.record_count, 25000)  # 10000 + 15000
         self.assertEqual(stat.file_count, 2)
         self.assertEqual(stat.file_size_in_bytes, 3 * 1024 * 1024)  # 1MB + 2MB
-        # Should have the latest creation time
-        self.assertEqual(stat.last_file_creation_time, 
int(creation_time_2.timestamp() * 1000))
+        expected_time = file_meta_2.creation_time_epoch_millis()
+        self.assertEqual(stat.last_file_creation_time, expected_time)
 
     def test_generate_partition_statistics_multiple_partitions(
             self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
@@ -168,22 +176,26 @@ class TestFileStoreCommit(unittest.TestCase):
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
 
-        creation_time = datetime(2024, 1, 15, 10, 30, 0)
+        creation_time_dt = datetime(2024, 1, 15, 10, 30, 0)
+        from pypaimon.data.timestamp import Timestamp
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.manifest.schema.simple_stats import SimpleStats
+        creation_time = Timestamp.from_local_date_time(creation_time_dt)
 
         # File for partition 1
-        file_meta_1 = DataFileMeta(
+        file_meta_1 = DataFileMeta.create(
             file_name="test_file_1.parquet",
             file_size=1024 * 1024,
             row_count=10000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=1,
             max_sequence_number=100,
             schema_id=0,
             level=0,
-            extra_files=None,
+            extra_files=[],
             creation_time=creation_time,
             external_path=None,
             first_row_id=None,
@@ -191,19 +203,19 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # File for partition 2
-        file_meta_2 = DataFileMeta(
+        file_meta_2 = DataFileMeta.create(
             file_name="test_file_2.parquet",
             file_size=2 * 1024 * 1024,
             row_count=20000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=101,
             max_sequence_number=200,
             schema_id=0,
             level=0,
-            extra_files=None,
+            extra_files=[],
             creation_time=creation_time,
             external_path=None,
             first_row_id=None,
@@ -255,20 +267,24 @@ class TestFileStoreCommit(unittest.TestCase):
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
 
-        creation_time = datetime(2024, 1, 15, 10, 30, 0)
-        file_meta = DataFileMeta(
+        creation_time_dt = datetime(2024, 1, 15, 10, 30, 0)
+        from pypaimon.data.timestamp import Timestamp
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.manifest.schema.simple_stats import SimpleStats
+        creation_time = Timestamp.from_local_date_time(creation_time_dt)
+        file_meta = DataFileMeta.create(
             file_name="test_file_1.parquet",
             file_size=1024 * 1024,
             row_count=10000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=1,
             max_sequence_number=100,
             schema_id=0,
             level=0,
-            extra_files=None,
+            extra_files=[],
             creation_time=creation_time,
             external_path=None,
             first_row_id=None,
@@ -312,7 +328,6 @@ class TestFileStoreCommit(unittest.TestCase):
             schema_id=0,
             level=0,
             extra_files=None,
-            creation_time=None  # No creation time
         )
 
         commit_message = CommitMessage(
@@ -338,20 +353,23 @@ class TestFileStoreCommit(unittest.TestCase):
         file_store_commit = self._create_file_store_commit()
 
         # Table has 2 partition keys but partition tuple has 3 values
-        file_meta = DataFileMeta(
+        from pypaimon.data.timestamp import Timestamp
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.manifest.schema.simple_stats import SimpleStats
+        file_meta = DataFileMeta.create(
             file_name="test_file_1.parquet",
             file_size=1024 * 1024,
             row_count=10000,
-            min_key=None,
-            max_key=None,
-            key_stats=None,
-            value_stats=None,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=SimpleStats.empty_stats(),
             min_sequence_number=1,
             max_sequence_number=100,
             schema_id=0,
             level=0,
-            extra_files=None,
-            creation_time=datetime(2024, 1, 15, 10, 30, 0)
+            extra_files=[],
+            creation_time=Timestamp.from_local_date_time(datetime(2024, 1, 15, 
10, 30, 0))
         )
 
         commit_message = CommitMessage(
diff --git 
a/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py 
b/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
index bbe1833f2c..5b86e39a37 100644
--- a/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
+++ b/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
@@ -55,6 +55,7 @@ class ManifestEntryIdentifierTest(unittest.TestCase):
 
     def _create_file_meta(self, file_name, level=0, extra_files=None, 
external_path=None):
         """Helper to create DataFileMeta with common defaults."""
+        from pypaimon.data.timestamp import Timestamp
         return DataFileMeta(
             file_name=file_name,
             file_size=1024,
@@ -76,7 +77,7 @@ class ManifestEntryIdentifierTest(unittest.TestCase):
             schema_id=0,
             level=level,
             extra_files=extra_files or [],
-            creation_time=1234567890,
+            creation_time=Timestamp.from_epoch_millis(1234567890),
             delete_row_count=0,
             embedded_index=None,
             file_source=None,
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 6e6d57f963..2c3a6775c0 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -17,7 +17,7 @@ limitations under the License.
 """
 import logging
 import time
-from datetime import date, datetime
+from datetime import date
 from decimal import Decimal
 from unittest.mock import Mock
 
@@ -208,10 +208,12 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         partition = GenericRow(['East', 'Boston'], partition_fields)
 
         # Create ADD entry
+        from pypaimon.data.timestamp import Timestamp
         add_file_meta = Mock(spec=DataFileMeta)
         add_file_meta.row_count = 200
         add_file_meta.file_size = 2048
-        add_file_meta.creation_time = datetime.now()
+        add_file_meta.creation_time = Timestamp.now()
+        add_file_meta.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         add_entry = ManifestEntry(
             kind=0,  # ADD
@@ -225,7 +227,8 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         delete_file_meta = Mock(spec=DataFileMeta)
         delete_file_meta.row_count = 80
         delete_file_meta.file_size = 800
-        delete_file_meta.creation_time = datetime.now()
+        delete_file_meta.creation_time = Timestamp.now()
+        delete_file_meta.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         delete_entry = ManifestEntry(
             kind=1,  # DELETE
@@ -261,10 +264,12 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
             DataField(1, "city", AtomicType("STRING"))
         ]
         partition1 = GenericRow(['East', 'Boston'], partition_fields)
+        from pypaimon.data.timestamp import Timestamp
         file_meta1 = Mock(spec=DataFileMeta)
         file_meta1.row_count = 150
         file_meta1.file_size = 1500
-        file_meta1.creation_time = datetime.now()
+        file_meta1.creation_time = Timestamp.now()
+        file_meta1.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         entry1 = ManifestEntry(
             kind=0,  # ADD
@@ -279,7 +284,8 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         file_meta2 = Mock(spec=DataFileMeta)
         file_meta2.row_count = 75
         file_meta2.file_size = 750
-        file_meta2.creation_time = datetime.now()
+        file_meta2.creation_time = Timestamp.now()
+        file_meta2.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         entry2 = ManifestEntry(
             kind=1,  # DELETE
@@ -816,6 +822,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         )
 
         # Create DataFileMeta with value_stats_cols
+        from pypaimon.data.timestamp import Timestamp
         file_meta = DataFileMeta(
             file_name=f"test-file-{test_name}.parquet",
             file_size=1024,
@@ -829,7 +836,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
             schema_id=0,
             level=0,
             extra_files=[],
-            creation_time=1234567890,
+            creation_time=Timestamp.from_epoch_millis(1234567890),
             delete_row_count=0,
             embedded_index=None,
             file_source=None,
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index fdcb9f1ea2..b41cd2e05f 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -296,10 +296,13 @@ class ReaderBasicTest(unittest.TestCase):
         partition = GenericRow(['East', 'Boston'], partition_fields)
 
         # Create ADD entry
+        from pypaimon.data.timestamp import Timestamp
+        import time
         add_file_meta = Mock(spec=DataFileMeta)
         add_file_meta.row_count = 200
         add_file_meta.file_size = 2048
-        add_file_meta.creation_time = datetime.now()
+        add_file_meta.creation_time = Timestamp.now()
+        add_file_meta.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         add_entry = ManifestEntry(
             kind=0,  # ADD
@@ -313,7 +316,8 @@ class ReaderBasicTest(unittest.TestCase):
         delete_file_meta = Mock(spec=DataFileMeta)
         delete_file_meta.row_count = 80
         delete_file_meta.file_size = 800
-        delete_file_meta.creation_time = datetime.now()
+        delete_file_meta.creation_time = Timestamp.now()
+        delete_file_meta.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         delete_entry = ManifestEntry(
             kind=1,  # DELETE
@@ -349,10 +353,13 @@ class ReaderBasicTest(unittest.TestCase):
             DataField(1, "city", AtomicType("STRING"))
         ]
         partition1 = GenericRow(['East', 'Boston'], partition_fields)
+        from pypaimon.data.timestamp import Timestamp
+        import time
         file_meta1 = Mock(spec=DataFileMeta)
         file_meta1.row_count = 150
         file_meta1.file_size = 1500
-        file_meta1.creation_time = datetime.now()
+        file_meta1.creation_time = Timestamp.now()
+        file_meta1.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         entry1 = ManifestEntry(
             kind=0,  # ADD
@@ -367,7 +374,8 @@ class ReaderBasicTest(unittest.TestCase):
         file_meta2 = Mock(spec=DataFileMeta)
         file_meta2.row_count = 75
         file_meta2.file_size = 750
-        file_meta2.creation_time = datetime.now()
+        file_meta2.creation_time = Timestamp.now()
+        file_meta2.creation_time_epoch_millis = 
Mock(return_value=int(time.time() * 1000))
 
         entry2 = ManifestEntry(
             kind=1,  # DELETE
@@ -600,6 +608,7 @@ class ReaderBasicTest(unittest.TestCase):
         )
 
         # Create DataFileMeta with value_stats_cols
+        from pypaimon.data.timestamp import Timestamp
         file_meta = DataFileMeta(
             file_name=f"test-file-{test_name}.parquet",
             file_size=1024,
@@ -613,7 +622,7 @@ class ReaderBasicTest(unittest.TestCase):
             schema_id=0,
             level=0,
             extra_files=[],
-            creation_time=1234567890,
+            creation_time=Timestamp.from_epoch_millis(1234567890),
             delete_row_count=0,
             embedded_index=None,
             file_source=None,
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index b62783852a..762229438e 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -344,6 +344,49 @@ class PkReaderTest(unittest.TestCase):
         }, schema=self.pa_schema).sort_by('user_id')
         self.assertEqual(expected, actual)
 
+    def test_manifest_creation_time_timestamp(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={'bucket': '2'})
+        self.catalog.create_table('default.test_manifest_creation_time', 
schema, False)
+        table = self.catalog.get_table('default.test_manifest_creation_time')
+
+        self._write_test_table(table)
+
+        snapshot_manager = SnapshotManager(table)
+        latest_snapshot = snapshot_manager.get_latest_snapshot()
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        manifest_list_manager = 
table_scan.starting_scanner.manifest_list_manager
+        manifest_files = manifest_list_manager.read_all(latest_snapshot)
+
+        manifest_file_manager = 
table_scan.starting_scanner.manifest_file_manager
+        creation_times_found = []
+        for manifest_file_meta in manifest_files:
+            entries = manifest_file_manager.read(manifest_file_meta.file_name, 
drop_stats=False)
+            for entry in entries:
+                if entry.file.creation_time is not None:
+                    creation_time = entry.file.creation_time
+                    self.assertIsNotNone(creation_time)
+                    epoch_millis = entry.file.creation_time_epoch_millis()
+                    self.assertIsNotNone(epoch_millis)
+                    self.assertGreater(epoch_millis, 0)
+                    import time
+                    expected_epoch_millis = creation_time.get_millisecond()
+                    local_dt = creation_time.to_local_date_time()
+                    local_time_struct = local_dt.timetuple()
+                    local_timestamp = time.mktime(local_time_struct)
+                    local_time_struct_utc = time.gmtime(local_timestamp)
+                    utc_timestamp = time.mktime(local_time_struct_utc)
+                    expected_epoch_millis = int(utc_timestamp * 1000)
+                    self.assertEqual(epoch_millis, expected_epoch_millis)
+                    creation_times_found.append(epoch_millis)
+
+        self.assertGreater(
+            len(creation_times_found), 0,
+            "At least one manifest entry should have creation_time")
+
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
 
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 014e8fbf6e..5c642ca6b1 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -308,9 +308,9 @@ class FileStoreCommit:
             file_size_in_bytes = file_meta.file_size if entry.kind == 0 else 
file_meta.file_size * -1
             file_count = 1 if entry.kind == 0 else -1
 
-            # Convert creation_time to milliseconds (Java uses epoch millis)
+            # Use epoch millis
             if file_meta.creation_time:
-                file_creation_time = int(file_meta.creation_time.timestamp() * 
1000)
+                file_creation_time = file_meta.creation_time_epoch_millis()
             else:
                 file_creation_time = int(time.time() * 1000)
 
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index c8d0d45076..6dc7b177b1 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -22,6 +22,7 @@ import pyarrow as pa
 from typing import Optional, Tuple, Dict
 
 from pypaimon.common.core_options import CoreOptions
+from pypaimon.data.timestamp import Timestamp
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
 from pypaimon.write.writer.blob_file_writer import BlobFileWriter
 
@@ -159,7 +160,6 @@ class BlobWriter(AppendOnlyDataWriter):
     def _add_file_metadata(self, file_name: str, file_path: str, 
data_or_row_count, file_size: int,
                            external_path: Optional[str] = None):
         """Add file metadata to committed_files."""
-        from datetime import datetime
         from pypaimon.manifest.schema.data_file_meta import DataFileMeta
         from pypaimon.manifest.schema.simple_stats import SimpleStats
         from pypaimon.table.row.generic_row import GenericRow
@@ -191,7 +191,7 @@ class BlobWriter(AppendOnlyDataWriter):
         max_seq = self.sequence_generator.current - 1
         self.sequence_generator.start = self.sequence_generator.current
 
-        self.committed_files.append(DataFileMeta(
+        self.committed_files.append(DataFileMeta.create(
             file_name=file_name,
             file_size=file_size,
             row_count=row_count,
@@ -207,7 +207,7 @@ class BlobWriter(AppendOnlyDataWriter):
             schema_id=self.table.table_schema.id,
             level=0,
             extra_files=[],
-            creation_time=datetime.now(),
+            creation_time=Timestamp.now(),
             delete_row_count=0,
             file_source=0,  # FileSource.APPEND = 0
             value_stats_cols=None,
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 65107856ee..079b408e66 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -18,12 +18,12 @@
 
 import logging
 import uuid
-from datetime import datetime
 from typing import List, Optional, Tuple, Dict
 
 import pyarrow as pa
 
 from pypaimon.common.core_options import CoreOptions
+from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.generic_row import GenericRow
@@ -286,7 +286,7 @@ class DataBlobWriter(DataWriter):
 
         self.sequence_generator.start = self.sequence_generator.current
 
-        return DataFileMeta(
+        return DataFileMeta.create(
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
             row_count=data.num_rows,
@@ -305,7 +305,7 @@ class DataBlobWriter(DataWriter):
             schema_id=self.table.table_schema.id,
             level=0,
             extra_files=[],
-            creation_time=datetime.now(),
+            creation_time=Timestamp.now(),
             delete_row_count=0,
             file_source=0,
             value_stats_cols=self.normal_column_names,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 8656a58879..e043da0eee 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -19,11 +19,11 @@ import pyarrow as pa
 import pyarrow.compute as pc
 import uuid
 from abc import ABC, abstractmethod
-from datetime import datetime
 from typing import Dict, List, Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.external_path_provider import ExternalPathProvider
+from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.schema.data_types import PyarrowFieldParser
@@ -211,7 +211,7 @@ class DataWriter(ABC):
         min_seq = self.sequence_generator.start
         max_seq = self.sequence_generator.current
         self.sequence_generator.start = self.sequence_generator.current
-        self.committed_files.append(DataFileMeta(
+        self.committed_files.append(DataFileMeta.create(
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
             row_count=data.num_rows,
@@ -232,7 +232,7 @@ class DataWriter(ABC):
             schema_id=self.table.table_schema.id,
             level=0,
             extra_files=[],
-            creation_time=datetime.now(),
+            creation_time=Timestamp.now(),
             delete_row_count=0,
             file_source=0,
             value_stats_cols=None,  # None means all columns in the data have 
statistics

Reply via email to