This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ece2fe34e [lake] Lake tiering service report the max timestamp of 
fluss record (#1700)
ece2fe34e is described below

commit ece2fe34e1f6a672b931486cef2e88be7992ac0c
Author: Liebing <[email protected]>
AuthorDate: Wed Sep 17 20:20:50 2025 +0800

    [lake] Lake tiering service report the max timestamp of fluss record (#1700)
---
 .../tiering/committer/FlussTableLakeSnapshot.java  |  27 ++++-
 .../committer/FlussTableLakeSnapshotCommitter.java |  26 ++--
 .../tiering/committer/TieringCommitOperator.java   |  10 +-
 .../tiering/source/TableBucketWriteResult.java     |  12 +-
 .../source/TableBucketWriteResultSerializer.java   |   6 +
 .../flink/tiering/source/TieringSplitReader.java   |  29 ++++-
 .../committer/TieringCommitOperatorTest.java       | 131 +++++++++++++++++----
 .../TableBucketWriteResultSerializerTest.java      |   5 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |   2 +
 .../fluss/server/entity/LakeBucketOffset.java      |   9 +-
 .../org/apache/fluss/server/log/LogTablet.java     |  11 ++
 .../fluss/server/replica/ReplicaManager.java       |   8 ++
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  14 ++-
 .../apache/fluss/server/zk/ZooKeeperClient.java    |   6 +
 .../fluss/server/zk/data/LakeTableSnapshot.java    |  17 ++-
 .../server/zk/data/LakeTableSnapshotJsonSerde.java |  14 +++
 .../replica/CommitLakeTableSnapshotITCase.java     |  16 ++-
 .../replica/NotifyReplicaLakeTableOffsetTest.java  |  28 +++--
 .../zk/data/LakeTableSnapshotJsonSerdeTest.java    |  18 ++-
 19 files changed, 328 insertions(+), 61 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
index b0ddc062a..510949902 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
@@ -22,6 +22,7 @@ import org.apache.fluss.utils.types.Tuple2;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /** A lake snapshot for a Fluss table. */
 class FlussTableLakeSnapshot {
@@ -34,10 +35,15 @@ class FlussTableLakeSnapshot {
     // if the bucket is not of a partition, the partition_name is null
     private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
 
+    // <table_bucket, partition_name> -> max timestamps,
+    // if the bucket is not of a partition, the partition_name is null
+    private final Map<Tuple2<TableBucket, String>, Long> maxTimestamps;
+
     FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
         this.tableId = tableId;
         this.lakeSnapshotId = lakeSnapshotId;
         this.logEndOffsets = new HashMap<>();
+        this.maxTimestamps = new HashMap<>();
     }
 
     public long tableId() {
@@ -48,16 +54,27 @@ class FlussTableLakeSnapshot {
         return lakeSnapshotId;
     }
 
-    public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
-        return logEndOffsets;
+    public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
+        return logEndOffsets.keySet();
     }
 
-    public void addBucketOffset(TableBucket bucket, long offset) {
+    public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset, 
long timestamp) {
         logEndOffsets.put(Tuple2.of(bucket, null), offset);
+        maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
     }
 
-    public void addPartitionBucketOffset(TableBucket bucket, String 
partitionName, long offset) {
+    public void addPartitionBucketOffsetAndTimestamp(
+            TableBucket bucket, String partitionName, long offset, long 
timestamp) {
         logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
+        maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
+    }
+
+    public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
+        return logEndOffsets.get(bucketPartition);
+    }
+
+    public long getMaxTimestamp(Tuple2<TableBucket, String> bucketPartition) {
+        return maxTimestamps.get(bucketPartition);
     }
 
     @Override
@@ -69,6 +86,8 @@ class FlussTableLakeSnapshot {
                 + lakeSnapshotId
                 + ", logEndOffsets="
                 + logEndOffsets
+                + ", maxTimestamps="
+                + maxTimestamps
                 + '}';
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index a0adc775b..a6cc47767 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -88,7 +88,12 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
             Long partitionId = partitionBucket.f0;
             if (partitionId == null) {
                 tableBucket = new TableBucket(tableId, partitionBucket.f1);
-                flussTableLakeSnapshot.addBucketOffset(tableBucket, 
entry.getValue());
+                // we use -1 since we don't store timestamp in lake snapshot 
property for
+                // simplicity, it may cause the timestamp to be -1 during 
constructing lake
+                // snapshot to commit to Fluss.
+                // But it should happen rarely and should be a normal value 
after next tiering.
+                flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
+                        tableBucket, entry.getValue(), -1);
             } else {
                 tableBucket = new TableBucket(tableId, partitionId, 
partitionBucket.f1);
                 // the partition name is qualified partition name in format:
@@ -98,8 +103,11 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
                         
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
                 ResolvedPartitionSpec resolvedPartitionSpec =
                         
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
-                flussTableLakeSnapshot.addPartitionBucketOffset(
-                        tableBucket, resolvedPartitionSpec.getPartitionName(), 
entry.getValue());
+                flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
+                        tableBucket,
+                        resolvedPartitionSpec.getPartitionName(),
+                        entry.getValue(),
+                        -1);
             }
         }
         commit(flussTableLakeSnapshot);
@@ -114,13 +122,14 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
 
         pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
         
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
-        for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry 
:
-                flussTableLakeSnapshot.logEndOffsets().entrySet()) {
+        for (Tuple2<TableBucket, String> bucketPartition :
+                flussTableLakeSnapshot.tablePartitionBuckets()) {
             PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
                     pbLakeTableSnapshotInfo.addBucketsReq();
-            TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
-            String partitionName = bucketEndOffsetEntry.getKey().f1;
-            long endOffset = bucketEndOffsetEntry.getValue();
+            TableBucket tableBucket = bucketPartition.f0;
+            String partitionName = bucketPartition.f1;
+            long endOffset = 
flussTableLakeSnapshot.getLogEndOffset(bucketPartition);
+            long maxTimestamp = 
flussTableLakeSnapshot.getMaxTimestamp(bucketPartition);
             if (tableBucket.getPartitionId() != null) {
                 
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
             }
@@ -129,6 +138,7 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
             }
             pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
             pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
+            pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
         }
         return commitLakeTableSnapshotRequest;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 282d12d80..779ca8913 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -229,10 +229,14 @@ public class TieringCommitOperator<WriteResult, 
Committable>
             for (TableBucketWriteResult<WriteResult> writeResult : 
committableWriteResults) {
                 TableBucket tableBucket = writeResult.tableBucket();
                 if (writeResult.tableBucket().getPartitionId() == null) {
-                    flussTableLakeSnapshot.addBucketOffset(tableBucket, 
writeResult.logEndOffset());
+                    flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
+                            tableBucket, writeResult.logEndOffset(), 
writeResult.maxTimestamp());
                 } else {
-                    flussTableLakeSnapshot.addPartitionBucketOffset(
-                            tableBucket, writeResult.partitionName(), 
writeResult.logEndOffset());
+                    
flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
+                            tableBucket,
+                            writeResult.partitionName(),
+                            writeResult.logEndOffset(),
+                            writeResult.maxTimestamp());
                 }
             }
             flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
index 7249cc4e5..ba648f297 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
@@ -42,12 +42,16 @@ public class TableBucketWriteResult<WriteResult> implements 
Serializable {
     // null when the bucket is not for a partition
     @Nullable private final String partitionName;
 
-    // will be null when no any data write, such as for tiering a empty log 
split
+    // will be null when no any data write, such as for tiering an empty log 
split
     @Nullable private final WriteResult writeResult;
 
     // the end offset of tiering, should be the last tiered record's offset + 1
     private final long logEndOffset;
 
+    // the max timestamp of tiering, should be the last tiered record's 
timestamp,
+    // will be -1 for empty log splits or snapshot splits
+    private final long maxTimestamp;
+
     // the total number of write results in one round of tiering,
     // used for downstream commiter operator to determine when all write 
results
     // for the round of tiering is finished
@@ -59,12 +63,14 @@ public class TableBucketWriteResult<WriteResult> implements 
Serializable {
             @Nullable String partitionName,
             @Nullable WriteResult writeResult,
             long logEndOffset,
+            long maxTimestamp,
             int numberOfWriteResults) {
         this.tablePath = tablePath;
         this.tableBucket = tableBucket;
         this.partitionName = partitionName;
         this.writeResult = writeResult;
         this.logEndOffset = logEndOffset;
+        this.maxTimestamp = maxTimestamp;
         this.numberOfWriteResults = numberOfWriteResults;
     }
 
@@ -93,4 +99,8 @@ public class TableBucketWriteResult<WriteResult> implements 
Serializable {
     public long logEndOffset() {
         return logEndOffset;
     }
+
+    public long maxTimestamp() {
+        return maxTimestamp;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
index 37412b472..365176095 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
@@ -85,6 +85,9 @@ public class TableBucketWriteResultSerializer<WriteResult>
         // serialize log end offset
         out.writeLong(tableBucketWriteResult.logEndOffset());
 
+        // serialize max timestamp
+        out.writeLong(tableBucketWriteResult.maxTimestamp());
+
         // serialize number of write results
         out.writeInt(tableBucketWriteResult.numberOfWriteResults());
 
@@ -129,6 +132,8 @@ public class TableBucketWriteResultSerializer<WriteResult>
 
         // deserialize log end offset
         long logEndOffset = in.readLong();
+        // deserialize max timestamp
+        long maxTimestamp = in.readLong();
         // deserialize number of write results
         int numberOfWriteResults = in.readInt();
         return new TableBucketWriteResult<>(
@@ -137,6 +142,7 @@ public class TableBucketWriteResultSerializer<WriteResult>
                 partitionName,
                 writeResult,
                 logEndOffset,
+                maxTimestamp,
                 numberOfWriteResults);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index 80e2e4c6f..bcee1861d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -68,6 +68,9 @@ public class TieringSplitReader<WriteResult>
 
     private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L);
 
+    // unknown bucket timestamp for empty split or snapshot split
+    private static final long UNKNOWN_BUCKET_TIMESTAMP = -1;
+
     private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
 
     // the id for the pending tables to be tiered
@@ -285,7 +288,10 @@ public class TieringSplitReader<WriteResult>
                 writeResults.put(
                         bucket,
                         completeLakeWriter(
-                                bucket, 
currentTieringSplit.getPartitionName(), stoppingOffset));
+                                bucket,
+                                currentTieringSplit.getPartitionName(),
+                                stoppingOffset,
+                                lastRecord.timestamp()));
                 // put split of the bucket
                 finishedSplitIds.put(bucket, currentSplitId);
                 LOG.info("Split {} has been finished.", currentSplitId);
@@ -316,7 +322,10 @@ public class TieringSplitReader<WriteResult>
     }
 
     private TableBucketWriteResult<WriteResult> completeLakeWriter(
-            TableBucket bucket, @Nullable String partitionName, long 
logEndOffset)
+            TableBucket bucket,
+            @Nullable String partitionName,
+            long logEndOffset,
+            long maxTimestamp)
             throws IOException {
         LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
         WriteResult writeResult = lakeWriter.complete();
@@ -327,6 +336,7 @@ public class TieringSplitReader<WriteResult>
                 partitionName,
                 writeResult,
                 logEndOffset,
+                maxTimestamp,
                 checkNotNull(currentTableNumberOfSplits));
     }
 
@@ -344,6 +354,7 @@ public class TieringSplitReader<WriteResult>
                             logSplit.getPartitionName(),
                             null,
                             logSplit.getStoppingOffset(),
+                            UNKNOWN_BUCKET_TIMESTAMP,
                             logSplit.getNumberOfSplits()));
         }
         return new TableBucketWriteResultWithSplitIds(writeResults, 
finishedSplitIds);
@@ -363,7 +374,10 @@ public class TieringSplitReader<WriteResult>
         String splitId = 
currentTableSplitsByBucket.remove(tableBucket).splitId();
         TableBucketWriteResult<WriteResult> writeResult =
                 completeLakeWriter(
-                        tableBucket, currentSnapshotSplit.getPartitionName(), 
logEndOffset);
+                        tableBucket,
+                        currentSnapshotSplit.getPartitionName(),
+                        logEndOffset,
+                        UNKNOWN_BUCKET_TIMESTAMP);
         closeCurrentSnapshotSplit();
         mayFinishCurrentTable();
         return new TableBucketWriteResultWithSplitIds(
@@ -483,9 +497,16 @@ public class TieringSplitReader<WriteResult>
             @Nullable String partitionName,
             @Nullable WriteResult writeResult,
             long endLogOffset,
+            long maxTimestamp,
             int numberOfSplits) {
         return new TableBucketWriteResult<>(
-                tablePath, tableBucket, partitionName, writeResult, 
endLogOffset, numberOfSplits);
+                tablePath,
+                tableBucket,
+                partitionName,
+                writeResult,
+                endLogOffset,
+                maxTimestamp,
+                numberOfSplits);
     }
 
     private class TableBucketWriteResultWithSplitIds
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index e6c412cad..6f0ab20f4 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -29,6 +29,8 @@ import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LakeTableSnapshot;
 import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.flink.configuration.Configuration;
@@ -56,6 +58,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -108,61 +111,77 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         TableBucket t1b0 = new TableBucket(tableId1, 0);
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath1, t1b0, 1, 11, numberOfWriteResults));
+                        tablePath1, t1b0, 1, 11, 21L, numberOfWriteResults));
         // table1, bucket 1
         TableBucket t1b1 = new TableBucket(tableId1, 1);
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath1, t1b1, 2, 12, numberOfWriteResults));
+                        tablePath1, t1b1, 2, 12, 22L, numberOfWriteResults));
         verifyNoLakeSnapshot(tablePath1);
 
         // table2, bucket0
         TableBucket t2b0 = new TableBucket(tableId2, 0);
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath2, t2b0, 1, 21, numberOfWriteResults));
+                        tablePath2, t2b0, 1, 21, 31L, numberOfWriteResults));
 
         // table2, bucket1
         TableBucket t2b1 = new TableBucket(tableId2, 1);
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath2, t2b1, 2, 22, numberOfWriteResults));
+                        tablePath2, t2b1, 2, 22, 32L, numberOfWriteResults));
         verifyNoLakeSnapshot(tablePath2);
 
         // add table1, bucket2
         TableBucket t1b2 = new TableBucket(tableId1, 2);
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath1, t1b2, 3, 13, numberOfWriteResults));
+                        tablePath1, t1b2, 3, 13, 23L, numberOfWriteResults));
         // verify lake snapshot
         Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
         expectedLogEndOffsets.put(t1b0, 11L);
         expectedLogEndOffsets.put(t1b1, 12L);
         expectedLogEndOffsets.put(t1b2, 13L);
-        verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets);
+        Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
+        expectedMaxTimestamps.put(t1b0, 21L);
+        expectedMaxTimestamps.put(t1b1, 22L);
+        expectedMaxTimestamps.put(t1b2, 23L);
+        verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets, 
expectedMaxTimestamps);
 
         // add table2, bucket2
         TableBucket t2b2 = new TableBucket(tableId2, 2);
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath2, t2b2, 4, 23, numberOfWriteResults));
+                        tablePath2, t2b2, 4, 23, 33L, numberOfWriteResults));
         expectedLogEndOffsets = new HashMap<>();
         expectedLogEndOffsets.put(t2b0, 21L);
         expectedLogEndOffsets.put(t2b1, 22L);
         expectedLogEndOffsets.put(t2b2, 23L);
-        verifyLakeSnapshot(tablePath2, tableId2, 1, expectedLogEndOffsets);
+        expectedMaxTimestamps = new HashMap<>();
+        expectedMaxTimestamps.put(t2b0, 31L);
+        expectedMaxTimestamps.put(t2b1, 32L);
+        expectedMaxTimestamps.put(t2b2, 33L);
+        verifyLakeSnapshot(tablePath2, tableId2, 1, expectedLogEndOffsets, 
expectedMaxTimestamps);
 
         // let's process one round of TableBucketWriteResult again
         expectedLogEndOffsets = new HashMap<>();
+        expectedMaxTimestamps = new HashMap<>();
         for (int bucket = 0; bucket < 3; bucket++) {
             TableBucket tableBucket = new TableBucket(tableId1, bucket);
             long offset = bucket * bucket;
+            long timestamp = bucket * bucket;
             committerOperator.processElement(
                     createTableBucketWriteResultStreamRecord(
-                            tablePath1, tableBucket, bucket, offset, 
numberOfWriteResults));
+                            tablePath1,
+                            tableBucket,
+                            bucket,
+                            offset,
+                            timestamp,
+                            numberOfWriteResults));
             expectedLogEndOffsets.put(tableBucket, offset);
+            expectedMaxTimestamps.put(tableBucket, timestamp);
         }
-        verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets);
+        verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets, 
expectedMaxTimestamps);
     }
 
     @Test
@@ -172,13 +191,16 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         Map<String, Long> partitionIdByNames =
                 FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
         Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
+        Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
         int numberOfWriteResults = 3 * partitionIdByNames.size();
         long offset = 0;
+        long timestamp = System.currentTimeMillis();
         for (int bucket = 0; bucket < 3; bucket++) {
             for (Map.Entry<String, Long> partitionIdAndNameEntry : 
partitionIdByNames.entrySet()) {
                 long partitionId = partitionIdAndNameEntry.getValue();
                 TableBucket tableBucket = new TableBucket(tableId, 
partitionId, bucket);
                 long currentOffset = offset++;
+                long currentTimestamp = timestamp++;
                 committerOperator.processElement(
                         createTableBucketWriteResultStreamRecord(
                                 tablePath,
@@ -186,11 +208,14 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                                 partitionIdAndNameEntry.getKey(),
                                 1,
                                 currentOffset,
+                                currentTimestamp,
                                 numberOfWriteResults));
                 expectedLogEndOffsets.put(tableBucket, currentOffset);
+                expectedMaxTimestamps.put(tableBucket, currentTimestamp);
             }
             if (bucket == 2) {
-                verifyLakeSnapshot(tablePath, tableId, 1, 
expectedLogEndOffsets);
+                verifyLakeSnapshot(
+                        tablePath, tableId, 1, expectedLogEndOffsets, 
expectedMaxTimestamps);
             } else {
                 verifyNoLakeSnapshot(tablePath);
             }
@@ -208,7 +233,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
             TableBucket tableBucket = new TableBucket(tableId, bucket);
             committerOperator.processElement(
                     createTableBucketWriteResultStreamRecord(
-                            tablePath1, tableBucket, null, 3, 
numberOfWriteResults));
+                            tablePath1, tableBucket, null, 3, 6L, 
numberOfWriteResults));
         }
 
         verifyNoLakeSnapshot(tablePath1);
@@ -223,16 +248,25 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                             bucket,
                             // just use bucket as log offset
                             bucket,
+                            bucket,
                             numberOfWriteResults));
         }
         committerOperator.processElement(
                 createTableBucketWriteResultStreamRecord(
-                        tablePath1, new TableBucket(tableId, 0), null, 3, 
numberOfWriteResults));
+                        tablePath1,
+                        new TableBucket(tableId, 0),
+                        null,
+                        3,
+                        6L,
+                        numberOfWriteResults));
 
         Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
         expectedLogEndOffsets.put(new TableBucket(tableId, 1), 1L);
         expectedLogEndOffsets.put(new TableBucket(tableId, 2), 2L);
-        verifyLakeSnapshot(tablePath1, tableId, 1, expectedLogEndOffsets);
+        Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
+        expectedMaxTimestamps.put(new TableBucket(tableId, 1), 1L);
+        expectedMaxTimestamps.put(new TableBucket(tableId, 2), 2L);
+        verifyLakeSnapshot(tablePath1, tableId, 1, expectedLogEndOffsets, 
expectedMaxTimestamps);
     }
 
     @Test
@@ -256,7 +290,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
             TableBucket tableBucket = new TableBucket(tableId, bucket);
             committerOperator.processElement(
                     createTableBucketWriteResultStreamRecord(
-                            tablePath, tableBucket, 3, 3, 
numberOfWriteResults));
+                            tablePath, tableBucket, 3, 3, 3L, 
numberOfWriteResults));
         }
 
         verifyLakeSnapshot(
@@ -264,6 +298,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 tableId,
                 2,
                 getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+                getExpectedMaxTimestamps(tableId, mockCommittedSnapshot),
                 mockCommittedSnapshot.getQualifiedPartitionNameById(),
                 String.format(
                         "The current Fluss's lake snapshot %s is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d},"
@@ -275,16 +310,19 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                         mockCommittedSnapshot));
 
         Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
+        Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
         for (int bucket = 0; bucket < 3; bucket++) {
             TableBucket tableBucket = new TableBucket(tableId, bucket);
             long offset = bucket * bucket;
+            long timestamp = bucket * bucket;
             committerOperator.processElement(
                     createTableBucketWriteResultStreamRecord(
-                            tablePath, tableBucket, 3, offset, 
numberOfWriteResults));
+                            tablePath, tableBucket, 3, offset, timestamp, 
numberOfWriteResults));
             expectedLogEndOffsets.put(tableBucket, offset);
+            expectedMaxTimestamps.put(tableBucket, timestamp);
         }
 
-        verifyLakeSnapshot(tablePath, tableId, 3, expectedLogEndOffsets);
+        verifyLakeSnapshot(tablePath, tableId, 3, expectedLogEndOffsets, 
expectedMaxTimestamps);
     }
 
     @Test
@@ -316,7 +354,13 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 TableBucket tableBucket = new TableBucket(tableId, 
partitionId, bucket);
                 committerOperator.processElement(
                         createTableBucketWriteResultStreamRecord(
-                                tablePath, tableBucket, partitionName, 3, 3, 
numberOfWriteResults));
+                                tablePath,
+                                tableBucket,
+                                partitionName,
+                                3,
+                                3,
+                                3L,
+                                numberOfWriteResults));
             }
         }
 
@@ -325,6 +369,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 tableId,
                 3,
                 getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+                getExpectedMaxTimestamps(tableId, mockCommittedSnapshot),
                 mockCommittedSnapshot.getQualifiedPartitionNameById(),
                 String.format(
                         "The current Fluss's lake snapshot %s is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d}, missing snapshot: %s.",
@@ -373,15 +418,38 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         return expectedLogEndOffsets;
     }
 
+    private Map<TableBucket, Long> getExpectedMaxTimestamps(
+            long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
+        Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
+        for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
+                committedLakeSnapshot.getLogEndOffsets().entrySet()) {
+            Tuple2<Long, Integer> partitionBucket = entry.getKey();
+            if (partitionBucket.f0 == null) {
+                expectedMaxTimestamps.put(new TableBucket(tableId, 
partitionBucket.f1), -1L);
+            } else {
+                expectedMaxTimestamps.put(
+                        new TableBucket(tableId, partitionBucket.f0, 
partitionBucket.f1), -1L);
+            }
+        }
+        return expectedMaxTimestamps;
+    }
+
     private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
             createTableBucketWriteResultStreamRecord(
                     TablePath tablePath,
                     TableBucket tableBucket,
                     @Nullable Integer writeResult,
                     long logEndOffset,
+                    long maxTimestamp,
                     int numberOfWriteResults) {
         return createTableBucketWriteResultStreamRecord(
-                tablePath, tableBucket, null, writeResult, logEndOffset, 
numberOfWriteResults);
+                tablePath,
+                tableBucket,
+                null,
+                writeResult,
+                logEndOffset,
+                maxTimestamp,
+                numberOfWriteResults);
     }
 
     private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
@@ -391,6 +459,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                     @Nullable String partitionName,
                     @Nullable Integer writeResult,
                     long logEndOffset,
+                    long maxTimestamp,
                     int numberOfWriteResults) {
         TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
                 new TableBucketWriteResult<>(
@@ -399,6 +468,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                         partitionName,
                         writeResult == null ? null : new 
TestingWriteResult(writeResult),
                         logEndOffset,
+                        maxTimestamp,
                         numberOfWriteResults);
         return new StreamRecord<>(tableBucketWriteResult);
     }
@@ -413,13 +483,17 @@ class TieringCommitOperatorTest extends FlinkTestBase {
             TablePath tablePath,
             long tableId,
             long expectedSnapshotId,
-            Map<TableBucket, Long> expectedLogEndOffsets)
+            Map<TableBucket, Long> expectedLogEndOffsets,
+            Map<TableBucket, Long> expectedMaxTimestamp)
             throws Exception {
         LakeSnapshot lakeSnapshot = 
admin.getLatestLakeSnapshot(tablePath).get();
         assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(expectedSnapshotId);
         
assertThat(lakeSnapshot.getTableBucketsOffset()).isEqualTo(expectedLogEndOffsets);
 
-        // check the tableId has been send to mark finished
+        // TODO: replace with LakeSnapshot when support max timestamp
+        verifyLakeSnapshotMaxTimestamp(tableId, expectedMaxTimestamp);
+
+        // check the tableId has been sent to mark finished
         List<OperatorEvent> operatorEvents = 
mockOperatorEventGateway.getEventsSent();
         SourceEventWrapper sourceEventWrapper =
                 (SourceEventWrapper) operatorEvents.get(operatorEvents.size() 
- 1);
@@ -433,6 +507,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
             long tableId,
             long expectedSnapshotId,
             Map<TableBucket, Long> expectedLogEndOffsets,
+            Map<TableBucket, Long> expectedMaxTimestamp,
             Map<Long, String> expectedPartitionIdByName,
             String failedReason)
             throws Exception {
@@ -441,7 +516,10 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         
assertThat(lakeSnapshot.getTableBucketsOffset()).isEqualTo(expectedLogEndOffsets);
         
assertThat(lakeSnapshot.getPartitionNameById()).isEqualTo(expectedPartitionIdByName);
 
-        // check the tableId has been send to mark failed
+        // TODO: replace with LakeSnapshot when support max timestamp
+        verifyLakeSnapshotMaxTimestamp(tableId, expectedMaxTimestamp);
+
+        // check the tableId has been sent to mark failed
         List<OperatorEvent> operatorEvents = 
mockOperatorEventGateway.getEventsSent();
         SourceEventWrapper sourceEventWrapper =
                 (SourceEventWrapper) operatorEvents.get(operatorEvents.size() 
- 1);
@@ -451,6 +529,15 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         assertThat(finishTieringEvent.failReason()).contains(failedReason);
     }
 
+    private void verifyLakeSnapshotMaxTimestamp(
+            long tableId, Map<TableBucket, Long> expectedMaxTimestamp) throws 
Exception {
+        ZooKeeperClient zkClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+        Optional<LakeTableSnapshot> lakeTableSnapshotOpt = 
zkClient.getLakeTableSnapshot(tableId);
+        assertThat(lakeTableSnapshotOpt).isNotEmpty();
+        LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotOpt.get();
+        
assertThat(lakeTableSnapshot.getBucketMaxTimestamp()).isEqualTo(expectedMaxTimestamp);
+    }
+
     private static class MockOperatorEventDispatcher implements 
OperatorEventDispatcher {
 
         private final OperatorEventGateway operatorEventGateway;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
index 2c718cbae..dbb40eae1 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
@@ -44,7 +44,7 @@ class TableBucketWriteResultSerializerTest {
         String partitionName = isPartitioned ? "partition1" : null;
         TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
                 new TableBucketWriteResult<>(
-                        tablePath, tableBucket, partitionName, 
testingWriteResult, 10, 20);
+                        tablePath, tableBucket, partitionName, 
testingWriteResult, 10, 30L, 20);
 
         // test serialize and deserialize
         byte[] serialized = 
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
@@ -63,7 +63,8 @@ class TableBucketWriteResultSerializerTest {
 
         // verify when writeResult is null
         tableBucketWriteResult =
-                new TableBucketWriteResult<>(tablePath, tableBucket, 
partitionName, null, 20, 30);
+                new TableBucketWriteResult<>(
+                        tablePath, tableBucket, partitionName, null, 20, 30L, 
30);
         serialized = 
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
         deserialized =
                 tableBucketWriteResultSerializer.deserialize(
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 2ddaa44e8..19e401095 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -449,6 +449,7 @@ message PbLakeTableOffsetForBucket {
   optional int64 log_start_offset = 3;
   optional int64 log_end_offset = 4;
   optional string partition_name = 5;
+  optional int64 max_timestamp = 6;
 }
 
 message CommitLakeTableSnapshotResponse {
@@ -473,6 +474,7 @@ message PbNotifyLakeTableOffsetReqForBucket {
   required int64 snapshot_id = 4;
   optional int64 log_start_offset = 5;
   optional int64 log_end_offset = 6;
+  optional int64 max_timestamp = 7;
 }
 
 message NotifyLakeTableOffsetResponse {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
index 9ccc46a9e..5692245a4 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
@@ -25,11 +25,14 @@ public class LakeBucketOffset {
 
     private final Long logStartOffset;
     private final Long logEndOffset;
+    private final Long maxTimestamp;
 
-    public LakeBucketOffset(long snapshotId, Long logStartOffset, Long 
logEndOffset) {
+    public LakeBucketOffset(
+            long snapshotId, Long logStartOffset, Long logEndOffset, Long 
maxTimestamp) {
         this.snapshotId = snapshotId;
         this.logStartOffset = logStartOffset;
         this.logEndOffset = logEndOffset;
+        this.maxTimestamp = maxTimestamp;
     }
 
     public long getSnapshotId() {
@@ -43,4 +46,8 @@ public class LakeBucketOffset {
     public Optional<Long> getLogEndOffset() {
         return Optional.ofNullable(logEndOffset);
     }
+
+    public Optional<Long> getMaxTimestamp() {
+        return Optional.ofNullable(maxTimestamp);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 1470081a8..b4c8f1c9a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -125,6 +125,7 @@ public final class LogTablet {
     // note: currently, for primary key table, the log start offset nerve be 
updated
     private volatile long lakeLogStartOffset = Long.MAX_VALUE;
     private volatile long lakeLogEndOffset = -1L;
+    private volatile long lakeMaxTimestamp = -1;
 
     private LogTablet(
             PhysicalTablePath physicalPath,
@@ -250,6 +251,10 @@ public final class LogTablet {
         return lakeLogEndOffset;
     }
 
+    public long getLakeMaxTimestamp() {
+        return lakeMaxTimestamp;
+    }
+
     public int getWriterIdCount() {
         return writerStateManager.writerIdCount();
     }
@@ -526,6 +531,12 @@ public final class LogTablet {
         }
     }
 
+    public void updateLakeMaxTimestamp(long lakeMaxTimestamp) {
+        if (lakeMaxTimestamp > this.lakeMaxTimestamp) {
+            this.lakeMaxTimestamp = lakeMaxTimestamp;
+        }
+    }
+
     public void loadWriterSnapshot(long lastOffset) throws IOException {
         synchronized (lock) {
             rebuildWriterState(lastOffset, writerStateManager);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 876e8e984..f11fe1399 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -759,6 +759,10 @@ public class ReplicaManager {
                                 .getLogEndOffset()
                                 .ifPresent(logTablet::updateLakeLogEndOffset);
 
+                        lakeBucketOffset
+                                .getMaxTimestamp()
+                                .ifPresent(logTablet::updateLakeMaxTimestamp);
+
                         responseCallback.accept(new 
NotifyLakeTableOffsetResponse());
                     }
                 });
@@ -818,6 +822,10 @@ public class ReplicaManager {
             lakeTableSnapshot
                     .getLogEndOffset(tb)
                     .ifPresent(replica.getLogTablet()::updateLakeLogEndOffset);
+
+            lakeTableSnapshot
+                    .getMaxTimestamp(tb)
+                    .ifPresent(replica.getLogTablet()::updateLakeMaxTimestamp);
         }
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index f77cf378e..823c2bb52 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -1428,6 +1428,7 @@ public class ServerRpcMessageUtils {
             long snapshotId = pdLakeTableSnapshotInfo.getSnapshotId();
             Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
             Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+            Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
             Map<Long, String> partitionNameByPartitionId = new HashMap<>();
 
             for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
@@ -1447,8 +1448,13 @@ public class ServerRpcMessageUtils {
                         lakeTableOffsetForBucket.hasLogEndOffset()
                                 ? lakeTableOffsetForBucket.getLogEndOffset()
                                 : null;
+                Long logMaxTimestamp =
+                        lakeTableOffsetForBucket.hasMaxTimestamp()
+                                ? lakeTableOffsetForBucket.getMaxTimestamp()
+                                : null;
                 bucketLogStartOffset.put(tableBucket, logStartOffset);
                 bucketLogEndOffset.put(tableBucket, logEndOffset);
+                bucketMaxTimestamp.put(tableBucket, logMaxTimestamp);
 
                 if (lakeTableOffsetForBucket.hasPartitionName()) {
                     partitionNameByPartitionId.put(
@@ -1462,6 +1468,7 @@ public class ServerRpcMessageUtils {
                             tableId,
                             bucketLogStartOffset,
                             bucketLogEndOffset,
+                            bucketMaxTimestamp,
                             partitionNameByPartitionId));
         }
         return new CommitLakeTableSnapshotData(lakeTableInfoByTableId);
@@ -1483,6 +1490,8 @@ public class ServerRpcMessageUtils {
 
         
lakeTableSnapshot.getLogEndOffset(tableBucket).ifPresent(reqForBucket::setLogEndOffset);
 
+        
lakeTableSnapshot.getMaxTimestamp(tableBucket).ifPresent(reqForBucket::setMaxTimestamp);
+
         return reqForBucket;
     }
 
@@ -1502,8 +1511,11 @@ public class ServerRpcMessageUtils {
                     reqForBucket.hasLogStartOffset() ? 
reqForBucket.getLogStartOffset() : null;
             Long logEndOffset =
                     reqForBucket.hasLogEndOffset() ? 
reqForBucket.getLogEndOffset() : null;
+            Long maxTimestamp =
+                    reqForBucket.hasMaxTimestamp() ? 
reqForBucket.getMaxTimestamp() : null;
             lakeBucketOffsetMap.put(
-                    tableBucket, new LakeBucketOffset(snapshotId, 
logStartOffset, logEndOffset));
+                    tableBucket,
+                    new LakeBucketOffset(snapshotId, logStartOffset, 
logEndOffset, maxTimestamp));
         }
 
         return new NotifyLakeTableOffsetData(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index d555cd1df..98654972a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -1048,6 +1048,11 @@ public class ZooKeeperClient implements AutoCloseable {
                     new HashMap<>(previous.getBucketLogEndOffset());
             
bucketLogEndOffset.putAll(lakeTableSnapshot.getBucketLogEndOffset());
 
+            // merge max timestamp, current will override the previous
+            Map<TableBucket, Long> bucketMaxTimestamp =
+                    new HashMap<>(previous.getBucketMaxTimestamp());
+            
bucketMaxTimestamp.putAll(lakeTableSnapshot.getBucketMaxTimestamp());
+
             Map<Long, String> partitionNameById =
                     new HashMap<>(previous.getPartitionNameIdByPartitionId());
             
partitionNameById.putAll(lakeTableSnapshot.getPartitionNameIdByPartitionId());
@@ -1058,6 +1063,7 @@ public class ZooKeeperClient implements AutoCloseable {
                             lakeTableSnapshot.getTableId(),
                             bucketLogStartOffset,
                             bucketLogEndOffset,
+                            bucketMaxTimestamp,
                             partitionNameById);
             zkClient.setData().forPath(path, 
LakeTableZNode.encode(lakeTableSnapshot));
         } else {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
index 4abccc1b3..9d65e3bf0 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
@@ -32,10 +32,11 @@ public class LakeTableSnapshot {
 
     // the log offset of the bucket
 
-    // mapping from bucket id to log start/end offset,
+    // mapping from bucket id to log start/end offset or max timestamp,
     // will be null if log offset is unknown such as reading the snapshot of 
primary key table
     private final Map<TableBucket, Long> bucketLogStartOffset;
     private final Map<TableBucket, Long> bucketLogEndOffset;
+    private final Map<TableBucket, Long> bucketMaxTimestamp;
 
     // mapping from partition id to partition name, will be empty if the table 
is not partitioned
     // table
@@ -46,11 +47,13 @@ public class LakeTableSnapshot {
             long tableId,
             Map<TableBucket, Long> bucketLogStartOffset,
             Map<TableBucket, Long> bucketLogEndOffset,
+            Map<TableBucket, Long> bucketMaxTimestamp,
             Map<Long, String> partitionNameIdByPartitionId) {
         this.snapshotId = snapshotId;
         this.tableId = tableId;
         this.bucketLogStartOffset = bucketLogStartOffset;
         this.bucketLogEndOffset = bucketLogEndOffset;
+        this.bucketMaxTimestamp = bucketMaxTimestamp;
         this.partitionNameIdByPartitionId = partitionNameIdByPartitionId;
     }
 
@@ -78,6 +81,10 @@ public class LakeTableSnapshot {
         return Optional.ofNullable(bucketLogEndOffset.get(tableBucket));
     }
 
+    public Optional<Long> getMaxTimestamp(TableBucket tableBucket) {
+        return Optional.ofNullable(bucketMaxTimestamp.get(tableBucket));
+    }
+
     public Map<TableBucket, Long> getBucketLogEndOffset() {
         return bucketLogEndOffset;
     }
@@ -86,6 +93,10 @@ public class LakeTableSnapshot {
         return bucketLogStartOffset;
     }
 
+    public Map<TableBucket, Long> getBucketMaxTimestamp() {
+        return bucketMaxTimestamp;
+    }
+
     public Map<Long, String> getPartitionNameIdByPartitionId() {
         return partitionNameIdByPartitionId;
     }
@@ -100,6 +111,7 @@ public class LakeTableSnapshot {
                 && tableId == that.tableId
                 && Objects.equals(bucketLogStartOffset, 
that.bucketLogStartOffset)
                 && Objects.equals(bucketLogEndOffset, that.bucketLogEndOffset)
+                && Objects.equals(bucketMaxTimestamp, that.bucketMaxTimestamp)
                 && Objects.equals(partitionNameIdByPartitionId, 
that.partitionNameIdByPartitionId);
     }
 
@@ -110,6 +122,7 @@ public class LakeTableSnapshot {
                 tableId,
                 bucketLogStartOffset,
                 bucketLogEndOffset,
+                bucketMaxTimestamp,
                 partitionNameIdByPartitionId);
     }
 
@@ -124,6 +137,8 @@ public class LakeTableSnapshot {
                 + bucketLogStartOffset
                 + ", bucketLogEndOffset="
                 + bucketLogEndOffset
+                + ", bucketMaxTimestamp="
+                + bucketMaxTimestamp
                 + ", partitionNameIdByPartitionId="
                 + partitionNameIdByPartitionId
                 + '}';
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
index 111332b25..539da5e02 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
@@ -43,6 +43,7 @@ public class LakeTableSnapshotJsonSerde
     private static final String BUCKET_ID = "bucket_id";
     private static final String LOG_START_OFFSET = "log_start_offset";
     private static final String LOG_END_OFFSET = "log_end_offset";
+    private static final String MAX_TIMESTAMP = "max_timestamp";
     private static final String PARTITION_NAME = "partition_name";
 
     private static final int VERSION = 1;
@@ -82,6 +83,11 @@ public class LakeTableSnapshotJsonSerde
                         LOG_END_OFFSET, 
lakeTableSnapshot.getLogEndOffset(tableBucket).get());
             }
 
+            if (lakeTableSnapshot.getMaxTimestamp(tableBucket).isPresent()) {
+                generator.writeNumberField(
+                        MAX_TIMESTAMP, 
lakeTableSnapshot.getMaxTimestamp(tableBucket).get());
+            }
+
             generator.writeEndObject();
         }
         generator.writeEndArray();
@@ -100,6 +106,7 @@ public class LakeTableSnapshotJsonSerde
         Iterator<JsonNode> buckets = node.get(BUCKETS).elements();
         Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
         Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
         Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
         while (buckets.hasNext()) {
             JsonNode bucket = buckets.next();
@@ -120,6 +127,12 @@ public class LakeTableSnapshotJsonSerde
                 bucketLogEndOffset.put(tableBucket, null);
             }
 
+            if (bucket.get(MAX_TIMESTAMP) != null) {
+                bucketMaxTimestamp.put(tableBucket, 
bucket.get(MAX_TIMESTAMP).asLong());
+            } else {
+                bucketMaxTimestamp.put(tableBucket, null);
+            }
+
             if (partitionId != null && bucket.get(PARTITION_NAME) != null) {
                 partitionNameIdByPartitionId.put(
                         tableBucket.getPartitionId(), 
bucket.get(PARTITION_NAME).asText());
@@ -130,6 +143,7 @@ public class LakeTableSnapshotJsonSerde
                 tableId,
                 bucketLogStartOffset,
                 bucketLogEndOffset,
+                bucketMaxTimestamp,
                 partitionNameIdByPartitionId);
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
index faa4f4a6c..88f3008fa 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
@@ -104,21 +104,25 @@ class CommitLakeTableSnapshotITCase {
         long snapshotId = 1;
         long dataLakeLogStartOffset = 0;
         long dataLakeLogEndOffset = 50;
+        long dataLakeMaxTimestamp = System.currentTimeMillis();
         CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
                 genCommitLakeTableSnapshotRequest(
                         tableId,
                         BUCKET_NUM,
                         snapshotId,
                         dataLakeLogStartOffset,
-                        dataLakeLogEndOffset);
+                        dataLakeLogEndOffset,
+                        dataLakeMaxTimestamp);
         
coordinatorGateway.commitLakeTableSnapshot(commitLakeTableSnapshotRequest).get();
 
         Map<TableBucket, Long> bucketsLogStartOffset = new HashMap<>();
         Map<TableBucket, Long> bucketsLogEndOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketsMaxTimestamp = new HashMap<>();
         for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
             TableBucket tb = new TableBucket(tableId, bucket);
             bucketsLogStartOffset.put(tb, dataLakeLogStartOffset);
             bucketsLogEndOffset.put(tb, dataLakeLogEndOffset);
+            bucketsMaxTimestamp.put(tb, dataLakeMaxTimestamp);
             Replica replica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
             retry(
                     Duration.ofMinutes(2),
@@ -127,6 +131,7 @@ class CommitLakeTableSnapshotITCase {
                         assertThat(logTablet.getLakeLogStartOffset())
                                 .isEqualTo(dataLakeLogStartOffset);
                         
assertThat(logTablet.getLakeLogEndOffset()).isEqualTo(dataLakeLogEndOffset);
+                        
assertThat(logTablet.getLakeMaxTimestamp()).isEqualTo(dataLakeMaxTimestamp);
                     });
         }
 
@@ -136,6 +141,7 @@ class CommitLakeTableSnapshotITCase {
                         tableId,
                         bucketsLogStartOffset,
                         bucketsLogEndOffset,
+                        bucketsMaxTimestamp,
                         Collections.emptyMap());
         checkLakeTableDataInZk(tableId, expectedDataLakeTieredInfo);
     }
@@ -146,7 +152,12 @@ class CommitLakeTableSnapshotITCase {
     }
 
     private static CommitLakeTableSnapshotRequest 
genCommitLakeTableSnapshotRequest(
-            long tableId, int buckets, long snapshotId, long logStartOffset, 
long logEndOffset) {
+            long tableId,
+            int buckets,
+            long snapshotId,
+            long logStartOffset,
+            long logEndOffset,
+            long maxTimestamp) {
         CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
                 new CommitLakeTableSnapshotRequest();
         PbLakeTableSnapshotInfo reqForTable = 
commitLakeTableSnapshotRequest.addTablesReq();
@@ -161,6 +172,7 @@ class CommitLakeTableSnapshotITCase {
             lakeTableOffsetForBucket.setBucketId(tb.getBucket());
             lakeTableOffsetForBucket.setLogStartOffset(logStartOffset);
             lakeTableOffsetForBucket.setLogEndOffset(logEndOffset);
+            lakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
         }
         return commitLakeTableSnapshotRequest;
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
index 57270c5e2..0d0b25204 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
@@ -43,30 +43,37 @@ class NotifyReplicaLakeTableOffsetTest extends 
ReplicaTestBase {
         Replica replica = replicaManager.getReplicaOrException(tb);
 
         // now, notify lake table offset
-        notifyAndVerify(tb, replica, 1, 0L, 20L);
+        notifyAndVerify(tb, replica, 1, 0L, 20L, System.currentTimeMillis());
         // notify again
-        notifyAndVerify(tb, replica, 2, 20L, 30L);
+        notifyAndVerify(tb, replica, 2, 20L, 30L, System.currentTimeMillis());
     }
 
     private void notifyAndVerify(
-            TableBucket tb, Replica replica, long snapshotId, long 
startOffset, long endOffset)
+            TableBucket tb,
+            Replica replica,
+            long snapshotId,
+            long startOffset,
+            long endOffset,
+            long maxTimestamp)
             throws Exception {
         NotifyLakeTableOffsetData notifyLakeTableOffsetData =
-                getNotifyLakeTableOffset(tb, snapshotId, startOffset, 
endOffset);
+                getNotifyLakeTableOffset(tb, snapshotId, startOffset, 
endOffset, maxTimestamp);
         CompletableFuture<NotifyLakeTableOffsetResponse> future = new 
CompletableFuture<>();
         replicaManager.notifyLakeTableOffset(notifyLakeTableOffsetData, 
future::complete);
         future.get();
-        verifyLakeTableOffset(replica, snapshotId, startOffset, endOffset);
+        verifyLakeTableOffset(replica, snapshotId, startOffset, endOffset, 
maxTimestamp);
     }
 
     private void verifyLakeTableOffset(
-            Replica replica, long snapshotId, long startOffset, long 
endOffset) {
+            Replica replica, long snapshotId, long startOffset, long 
endOffset, long maxTimestamp) {
         
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeTableSnapshotId())
                 .isEqualTo(snapshotId);
         
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogStartOffset())
                 .isEqualTo(startOffset);
         
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogEndOffset())
                 .isEqualTo(endOffset);
+        
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeMaxTimestamp())
+                .isEqualTo(maxTimestamp);
     }
 
     private TableBucket makeTableBucket(boolean partitionTable) {
@@ -82,10 +89,15 @@ class NotifyReplicaLakeTableOffsetTest extends 
ReplicaTestBase {
     }
 
     private NotifyLakeTableOffsetData getNotifyLakeTableOffset(
-            TableBucket tableBucket, long snapshotId, long startOffset, long 
endOffset) {
+            TableBucket tableBucket,
+            long snapshotId,
+            long startOffset,
+            long endOffset,
+            long maxTimestamp) {
         return new NotifyLakeTableOffsetData(
                 1,
                 Collections.singletonMap(
-                        tableBucket, new LakeBucketOffset(snapshotId, 
startOffset, endOffset)));
+                        tableBucket,
+                        new LakeBucketOffset(snapshotId, startOffset, 
endOffset, maxTimestamp)));
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
index 306b0320e..282c00d4b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
@@ -39,6 +39,7 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
                         1L,
                         Collections.emptyMap(),
                         Collections.emptyMap(),
+                        Collections.emptyMap(),
                         Collections.emptyMap());
 
         long tableId = 4;
@@ -48,6 +49,9 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
         Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
         bucketLogEndOffset.put(new TableBucket(tableId, 1), 3L);
         bucketLogEndOffset.put(new TableBucket(tableId, 2), 4L);
+        Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+        bucketMaxTimestamp.put(new TableBucket(tableId, 1), 5L);
+        bucketMaxTimestamp.put(new TableBucket(tableId, 2), 6L);
 
         LakeTableSnapshot lakeTableSnapshot2 =
                 new LakeTableSnapshot(
@@ -55,6 +59,7 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
                         tableId,
                         bucketLogStartOffset,
                         bucketLogEndOffset,
+                        bucketMaxTimestamp,
                         Collections.emptyMap());
 
         tableId = 5;
@@ -69,12 +74,17 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
         bucketLogEndOffset.put(new TableBucket(tableId, 1L, 1), 3L);
         bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 4L);
 
+        bucketMaxTimestamp = new HashMap<>();
+        bucketMaxTimestamp.put(new TableBucket(tableId, 1L, 1), 5L);
+        bucketMaxTimestamp.put(new TableBucket(tableId, 2L, 1), 6L);
+
         LakeTableSnapshot lakeTableSnapshot3 =
                 new LakeTableSnapshot(
                         3,
                         tableId,
                         bucketLogStartOffset,
                         bucketLogEndOffset,
+                        bucketMaxTimestamp,
                         partitionNameIdByPartitionId);
 
         return new LakeTableSnapshot[] {
@@ -87,11 +97,11 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
         return new String[] {
             "{\"version\":1,\"snapshot_id\":1,\"table_id\":1,\"buckets\":[]}",
             "{\"version\":1,\"snapshot_id\":2,\"table_id\":4,"
-                    + 
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4},"
-                    + 
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3}]}",
+                    + 
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4,\"max_timestamp\":6},"
+                    + 
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3,\"max_timestamp\":5}]}",
             "{\"version\":1,\"snapshot_id\":3,\"table_id\":5,"
-                    + 
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3},"
-                    + 
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4}]}"
+                    + 
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3,\"max_timestamp\":5},"
+                    + 
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4,\"max_timestamp\":6}]}"
         };
     }
 }

Reply via email to