This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new de2d9a042 [lake] Move parse offsets from snapshot property to common 
method (#2208)
de2d9a042 is described below

commit de2d9a042637386f9c8f59bddbff8a1a66853ca0
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Dec 22 16:17:43 2025 +0800

    [lake] Move parse offsets from snapshot property to common method (#2208)
---
 .../lake/committer/CommittedLakeSnapshot.java      | 35 +++++----------
 .../committer/FlussTableLakeSnapshotCommitter.java | 14 ++----
 .../tiering/committer/TieringCommitOperator.java   | 51 +++++++++++++++++++---
 .../FlussTableLakeSnapshotCommitterTest.java       | 19 +++-----
 .../committer/TieringCommitOperatorTest.java       | 51 +++++++++-------------
 .../lake/iceberg/tiering/IcebergLakeCommitter.java | 32 +-------------
 .../lake/lance/tiering/LanceLakeCommitter.java     | 29 ++----------
 .../fluss/lake/lance/tiering/LanceTieringTest.java | 14 +++++-
 .../lake/paimon/tiering/PaimonLakeCommitter.java   | 26 +----------
 .../lake/paimon/tiering/PaimonTieringTest.java     | 12 ++++-
 10 files changed, 116 insertions(+), 167 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
index 7b9e2f4cc..dfc67b2e4 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
@@ -17,41 +17,30 @@
 
 package org.apache.fluss.lake.committer;
 
-import org.apache.fluss.utils.types.Tuple2;
-
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * The lake already committed snapshot, containing the lake snapshot id and 
the bucket end offsets
- * in this snapshot.
+ * The lake already committed snapshot, containing the lake snapshot id and 
the properties stored in
+ * this snapshot.
  */
 public class CommittedLakeSnapshot {
 
     private final long lakeSnapshotId;
-    // <partition_id, bucket> -> log offset, partition_id will be null if it's 
not a
-    // partition bucket
-    private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new 
HashMap<>();
 
-    public CommittedLakeSnapshot(long lakeSnapshotId) {
+    private final Map<String, String> snapshotProperties;
+
+    public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> 
snapshotProperties) {
         this.lakeSnapshotId = lakeSnapshotId;
+        this.snapshotProperties = snapshotProperties;
     }
 
     public long getLakeSnapshotId() {
         return lakeSnapshotId;
     }
 
-    public void addBucket(int bucketId, long offset) {
-        logEndOffsets.put(Tuple2.of(null, bucketId), offset);
-    }
-
-    public void addPartitionBucket(Long partitionId, int bucketId, long 
offset) {
-        logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
-    }
-
-    public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
-        return logEndOffsets;
+    public Map<String, String> getSnapshotProperties() {
+        return snapshotProperties;
     }
 
     @Override
@@ -64,12 +53,12 @@ public class CommittedLakeSnapshot {
         }
         CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
         return lakeSnapshotId == that.lakeSnapshotId
-                && Objects.equals(logEndOffsets, that.logEndOffsets);
+                && Objects.equals(snapshotProperties, that.snapshotProperties);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(lakeSnapshotId, logEndOffsets);
+        return Objects.hash(lakeSnapshotId, snapshotProperties);
     }
 
     @Override
@@ -77,8 +66,8 @@ public class CommittedLakeSnapshot {
         return "CommittedLakeSnapshot{"
                 + "lakeSnapshotId="
                 + lakeSnapshotId
-                + ", logEndOffsets="
-                + logEndOffsets
+                + ", snapshotProperties="
+                + snapshotProperties
                 + '}';
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index b78614dc4..f4be12593 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -20,7 +20,6 @@ package org.apache.fluss.flink.tiering.committer;
 import org.apache.fluss.client.metadata.MetadataUpdater;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.rpc.GatewayClientProxy;
@@ -31,7 +30,6 @@ import 
org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
 import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
 import org.apache.fluss.rpc.metrics.ClientMetricGroup;
 import org.apache.fluss.utils.ExceptionUtils;
-import org.apache.fluss.utils.types.Tuple2;
 
 import java.io.IOException;
 import java.util.Map;
@@ -75,17 +73,13 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
         }
     }
 
-    public void commit(long tableId, CommittedLakeSnapshot 
committedLakeSnapshot)
+    public void commit(long tableId, long snapshotId, Map<TableBucket, Long> 
logEndOffsets)
             throws IOException {
         // construct lake snapshot to commit to Fluss
         FlussTableLakeSnapshot flussTableLakeSnapshot =
-                new FlussTableLakeSnapshot(tableId, 
committedLakeSnapshot.getLakeSnapshotId());
-        for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
-                committedLakeSnapshot.getLogEndOffsets().entrySet()) {
-            Tuple2<Long, Integer> partitionBucket = entry.getKey();
-            Long partitionId = partitionBucket.f0;
-            TableBucket tableBucket = new TableBucket(tableId, partitionId, 
partitionBucket.f1);
-            flussTableLakeSnapshot.addBucketOffset(tableBucket, 
entry.getValue());
+                new FlussTableLakeSnapshot(tableId, snapshotId);
+        for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
+            flussTableLakeSnapshot.addBucketOffset(entry.getKey(), 
entry.getValue());
         }
         commit(flussTableLakeSnapshot);
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index a678a4b23..9d60c8899 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -34,10 +34,11 @@ import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.writer.LakeTieringFactory;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.fluss.utils.ExceptionUtils;
 import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 
@@ -86,6 +87,8 @@ public class TieringCommitOperator<WriteResult, Committable>
         implements OneInputStreamOperator<
                 TableBucketWriteResult<WriteResult>, 
CommittableMessage<Committable>> {
 
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private static final long serialVersionUID = 1L;
 
     private final Configuration flussConfig;
@@ -223,6 +226,7 @@ public class TieringCommitOperator<WriteResult, Committable>
             Committable committable = 
lakeCommitter.toCommittable(writeResults);
             // before commit to lake, check fluss not missing any lake 
snapshot committed by fluss
             checkFlussNotMissingLakeSnapshot(
+                    tableId,
                     tablePath,
                     lakeCommitter,
                     committable,
@@ -303,6 +307,7 @@ public class TieringCommitOperator<WriteResult, Committable>
     }
 
     private void checkFlussNotMissingLakeSnapshot(
+            long tableId,
             TablePath tablePath,
             LakeCommitter<WriteResult, Committable> lakeCommitter,
             Committable committable,
@@ -318,10 +323,33 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
+            if (missingCommittedSnapshot.getSnapshotProperties() == null
+                    || missingCommittedSnapshot
+                                    .getSnapshotProperties()
+                                    
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
+                            == null) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Missing required log offsets property '%s' in 
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
+                                        + "This property is required to commit 
the missing snapshot to Fluss. "
+                                        + "The snapshot may have been created 
by an older version of Fluss that did not store this information, "
+                                        + "or the snapshot properties may be 
corrupted.",
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                missingCommittedSnapshot.getLakeSnapshotId(),
+                                tablePath,
+                                tableId));
+            }
+
+            String logOffsetsProperty =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
             // commit this missing snapshot to fluss
-            TableInfo tableInfo = admin.getTableInfo(tablePath).get();
             flussTableLakeSnapshotCommitter.commit(
-                    tableInfo.getTableId(), missingCommittedSnapshot);
+                    tableId,
+                    missingCommittedSnapshot.getLakeSnapshotId(),
+                    fromLogOffsetProperty(tableId, logOffsetsProperty));
             // abort this committable to delete the written files
             lakeCommitter.abort(committable);
             throw new IllegalStateException(
@@ -331,12 +359,25 @@ public class TieringCommitOperator<WriteResult, 
Committable>
                                     + " missing snapshot: %s.",
                             flussCurrentLakeSnapshot,
                             missingCommittedSnapshot.getLakeSnapshotId(),
-                            tableInfo.getTablePath(),
-                            tableInfo.getTableId(),
+                            tablePath,
+                            tableId,
                             missingCommittedSnapshot));
         }
     }
 
+    public static Map<TableBucket, Long> fromLogOffsetProperty(
+            long tableId, String logOffsetsProperty) throws IOException {
+        Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+        for (JsonNode node : OBJECT_MAPPER.readTree(logOffsetsProperty)) {
+            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+            TableBucket tableBucket =
+                    new TableBucket(
+                            tableId, bucketOffset.getPartitionId(), 
bucketOffset.getBucket());
+            logEndOffsets.put(tableBucket, bucketOffset.getLogOffset());
+        }
+        return logEndOffsets;
+    }
+
     private void registerTableBucketWriteResult(
             long tableId, TableBucketWriteResult<WriteResult> 
tableBucketWriteResult) {
         collectedTableBucketWriteResults
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 927d69e2b..f8d71e6cf 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -19,7 +19,6 @@ package org.apache.fluss.flink.tiering.committer;
 
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.flink.utils.FlinkTestBase;
-import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
 
@@ -71,7 +70,6 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
 
         List<String> partitions;
         Map<String, Long> partitionNameAndIds = new HashMap<>();
-        Map<Long, String> expectedPartitionNameById = new HashMap<>();
         if (!isPartitioned) {
             FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
             partitions = Collections.singletonList(null);
@@ -80,32 +78,27 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
             partitions = new ArrayList<>(partitionNameAndIds.keySet());
         }
 
-        CommittedLakeSnapshot committedLakeSnapshot = new 
CommittedLakeSnapshot(3);
-
-        Map<TableBucket, Long> expectedOffsets = new HashMap<>();
+        Map<TableBucket, Long> logEndOffsets = new HashMap<>();
         for (int bucket = 0; bucket < 3; bucket++) {
             long bucketOffset = bucket * bucket;
             for (String partitionName : partitions) {
                 if (partitionName == null) {
-                    committedLakeSnapshot.addBucket(bucket, bucketOffset);
-                    expectedOffsets.put(new TableBucket(tableId, bucket), 
bucketOffset);
+                    logEndOffsets.put(new TableBucket(tableId, bucket), 
bucketOffset);
                 } else {
                     long partitionId = partitionNameAndIds.get(partitionName);
-                    committedLakeSnapshot.addPartitionBucket(partitionId, 
bucket, bucketOffset);
-                    expectedOffsets.put(
-                            new TableBucket(tableId, partitionId, bucket), 
bucketOffset);
-                    expectedPartitionNameById.put(partitionId, partitionName);
+                    logEndOffsets.put(new TableBucket(tableId, partitionId, 
bucket), bucketOffset);
                 }
             }
         }
 
+        long snapshotId = 3;
         // commit offsets
-        flussTableLakeSnapshotCommitter.commit(tableId, committedLakeSnapshot);
+        flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, 
logEndOffsets);
         LakeSnapshot lakeSnapshot = 
admin.getLatestLakeSnapshot(tablePath).get();
         assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
 
         // get and check the offsets
         Map<TableBucket, Long> bucketLogOffsets = 
lakeSnapshot.getTableBucketsOffset();
-        assertThat(bucketLogOffsets).isEqualTo(expectedOffsets);
+        assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 230460395..0be879e9f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -28,7 +28,6 @@ import org.apache.fluss.flink.utils.FlinkTestBase;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -50,12 +49,16 @@ import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
+import static 
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static 
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -254,8 +257,12 @@ class TieringCommitOperatorTest extends FlinkTestBase {
 
     @Test
     void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", 
"test_commit_when_fluss_missing_lake_snapshot");
+        long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
+        int numberOfWriteResults = 3;
+
         CommittedLakeSnapshot mockCommittedSnapshot =
-                mockCommittedLakeSnapshot(Collections.singletonList(null), 2);
+                mockCommittedLakeSnapshot(Collections.singletonList(null), 
tableId, 2);
         TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
                 new 
TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
         committerOperator =
@@ -266,10 +273,6 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                         new TestingLakeTieringFactory(testingLakeCommitter));
         committerOperator.open();
 
-        TablePath tablePath = TablePath.of("fluss", 
"test_commit_when_fluss_missing_lake_snapshot");
-        long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
-        int numberOfWriteResults = 3;
-
         for (int bucket = 0; bucket < 3; bucket++) {
             TableBucket tableBucket = new TableBucket(tableId, bucket);
             committerOperator.processElement(
@@ -316,7 +319,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
 
         CommittedLakeSnapshot mockCommittedSnapshot =
-                mockCommittedLakeSnapshot(Collections.singletonList(null), 3);
+                mockCommittedLakeSnapshot(Collections.singletonList(null), 
tableId, 3);
         TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
                 new 
TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
         committerOperator =
@@ -359,36 +362,24 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                         mockCommittedSnapshot));
     }
 
-    private CommittedLakeSnapshot mockCommittedLakeSnapshot(List<Long> 
partitions, int snapshotId) {
-        CommittedLakeSnapshot mockCommittedSnapshot = new 
CommittedLakeSnapshot(snapshotId);
+    private CommittedLakeSnapshot mockCommittedLakeSnapshot(
+            List<Long> partitions, long tableId, int snapshotId) throws 
IOException {
+        Map<TableBucket, Long> logEndOffsets = new HashMap<>();
         for (Long partition : partitions) {
             for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
-                if (partition == null) {
-                    mockCommittedSnapshot.addBucket(bucket, bucket + 1);
-                } else {
-                    mockCommittedSnapshot.addPartitionBucket(partition, 
bucket, bucket + 1);
-                }
+                logEndOffsets.put(new TableBucket(tableId, partition, bucket), 
bucket + 1L);
             }
         }
-        return mockCommittedSnapshot;
+        return new CommittedLakeSnapshot(snapshotId, 
toBucketOffsetsProperty(logEndOffsets));
     }
 
     private Map<TableBucket, Long> getExpectedLogEndOffsets(
-            long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
-        Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
-        for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
-                committedLakeSnapshot.getLogEndOffsets().entrySet()) {
-            Tuple2<Long, Integer> partitionBucket = entry.getKey();
-            if (partitionBucket.f0 == null) {
-                expectedLogEndOffsets.put(
-                        new TableBucket(tableId, partitionBucket.f1), 
entry.getValue());
-            } else {
-                expectedLogEndOffsets.put(
-                        new TableBucket(tableId, partitionBucket.f0, 
partitionBucket.f1),
-                        entry.getValue());
-            }
-        }
-        return expectedLogEndOffsets;
+            long tableId, CommittedLakeSnapshot committedLakeSnapshot) throws 
IOException {
+        return fromLogOffsetProperty(
+                tableId,
+                committedLakeSnapshot
+                        .getSnapshotProperties()
+                        .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
     }
 
     private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 935d06231..d07c9eb91 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -17,14 +17,10 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
-import org.apache.fluss.lake.committer.BucketOffset;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
 import org.apache.fluss.metadata.TablePath;
-import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogUtil;
@@ -52,7 +48,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
 import static 
org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -66,7 +61,6 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
 
     private final Catalog icebergCatalog;
     private final Table icebergTable;
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
 
     public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider, 
TablePath tablePath)
@@ -222,7 +216,6 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
     @Override
     public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
             throws IOException {
-        // todo: may refactor to common methods?
         Snapshot latestLakeSnapshot =
                 
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
 
@@ -246,9 +239,6 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
             }
         }
 
-        CommittedLakeSnapshot committedLakeSnapshot =
-                new CommittedLakeSnapshot(latestLakeSnapshot.snapshotId());
-
         // Reconstruct bucket offsets from snapshot properties
         Map<String, String> properties = latestLakeSnapshot.summary();
         if (properties == null) {
@@ -256,27 +246,7 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
                     "Failed to load committed lake snapshot properties from 
Iceberg.");
         }
 
-        String flussOffsetProperties = 
properties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
-        if (flussOffsetProperties == null) {
-            throw new IllegalArgumentException(
-                    "Cannot resume tiering from snapshot without bucket offset 
properties. "
-                            + "The snapshot was committed to Iceberg but 
missing Fluss metadata.");
-        }
-
-        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
-            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
-            if (bucketOffset.getPartitionId() != null) {
-                committedLakeSnapshot.addPartitionBucket(
-                        bucketOffset.getPartitionId(),
-                        bucketOffset.getBucket(),
-                        bucketOffset.getLogOffset());
-            } else {
-                committedLakeSnapshot.addBucket(
-                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
-            }
-        }
-
-        return committedLakeSnapshot;
+        return new CommittedLakeSnapshot(latestLakeSnapshot.snapshotId(), 
properties);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
index 13cb112db..1aed7810e 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
@@ -18,15 +18,11 @@
 package org.apache.fluss.lake.lance.tiering;
 
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.committer.BucketOffset;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.lance.LanceConfig;
 import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
 import org.apache.fluss.metadata.TablePath;
-import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 import org.apache.fluss.utils.types.Tuple2;
 
 import com.lancedb.lance.Dataset;
@@ -45,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static 
org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
 
 /** Implementation of {@link LakeCommitter} for Lance. */
@@ -53,7 +48,6 @@ public class LanceLakeCommitter implements 
LakeCommitter<LanceWriteResult, Lance
     private final LanceConfig config;
     private static final String committerName = "commit-user";
     private final RootAllocator allocator = new RootAllocator();
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     public LanceLakeCommitter(Configuration options, TablePath tablePath) {
         this.config =
@@ -109,26 +103,9 @@ public class LanceLakeCommitter implements 
LakeCommitter<LanceWriteResult, Lance
             return null;
         }
 
-        CommittedLakeSnapshot committedLakeSnapshot =
-                new 
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.f0.getId());
-        String flussOffsetProperties =
-                latestLakeSnapshotIdOfLake
-                        .f1
-                        .transactionProperties()
-                        .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
-        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
-            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
-            if (bucketOffset.getPartitionId() != null) {
-                committedLakeSnapshot.addPartitionBucket(
-                        bucketOffset.getPartitionId(),
-                        bucketOffset.getBucket(),
-                        bucketOffset.getLogOffset());
-            } else {
-                committedLakeSnapshot.addBucket(
-                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
-            }
-        }
-        return committedLakeSnapshot;
+        return new CommittedLakeSnapshot(
+                latestLakeSnapshotIdOfLake.f0.getId(),
+                latestLakeSnapshotIdOfLake.f1.transactionProperties());
     }
 
     @Nullable
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index 1157daf76..1cfac3723 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -64,7 +64,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
 import static 
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
@@ -201,11 +203,19 @@ class LanceTieringTest {
             // use snapshot id 1 as the known snapshot id
             CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(1L);
             assertThat(committedLakeSnapshot).isNotNull();
-            Map<Tuple2<Long, Integer>, Long> offsets = 
committedLakeSnapshot.getLogEndOffsets();
+            long tableId = tableInfo.getTableId();
+            Map<TableBucket, Long> offsets =
+                    fromLogOffsetProperty(
+                            tableInfo.getTableId(),
+                            committedLakeSnapshot
+                                    .getSnapshotProperties()
+                                    
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+
             for (int bucket = 0; bucket < 3; bucket++) {
                 for (Long partitionId : partitionIdAndName.keySet()) {
                     // we only write 10 records, so expected log offset should 
be 10
-                    assertThat(offsets.get(Tuple2.of(partitionId, 
bucket))).isEqualTo(10);
+                    assertThat(offsets.get(new TableBucket(tableId, 
partitionId, bucket)))
+                            .isEqualTo(10);
                 }
             }
             
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isEqualTo(2L);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index ee0419312..72790bdcb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -18,14 +18,10 @@
 package org.apache.fluss.lake.paimon.tiering;
 
 import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.lake.committer.BucketOffset;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.metadata.TablePath;
-import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
@@ -46,7 +42,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static 
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -60,7 +55,6 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     private TableCommitImpl tableCommit;
 
     private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     public PaimonLakeCommitter(
             PaimonCatalogProvider paimonCatalogProvider, CommitterInitContext 
committerInitContext)
@@ -135,9 +129,6 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
             return null;
         }
 
-        CommittedLakeSnapshot committedLakeSnapshot =
-                new CommittedLakeSnapshot(latestLakeSnapshotOfLake.id());
-
         if (latestLakeSnapshotOfLake.properties() == null) {
             throw new IOException("Failed to load committed lake snapshot 
properties from Paimon.");
         }
@@ -155,23 +146,8 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
                             + "To resolve this:\n"
                             + "1. Run the old tiering service(v0.7) again to 
complete the Fluss commit\n"
                             + "2. Then you can resume tiering with the newer 
version of tiering service");
-        } else {
-            String flussOffsetProperties =
-                    
lakeSnapshotProperties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
-            for (JsonNode node : 
OBJECT_MAPPER.readTree(flussOffsetProperties)) {
-                BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
-                if (bucketOffset.getPartitionId() != null) {
-                    committedLakeSnapshot.addPartitionBucket(
-                            bucketOffset.getPartitionId(),
-                            bucketOffset.getBucket(),
-                            bucketOffset.getLogOffset());
-                } else {
-                    committedLakeSnapshot.addBucket(
-                            bucketOffset.getBucket(), 
bucketOffset.getLogOffset());
-                }
-            }
         }
-        return committedLakeSnapshot;
+        return new CommittedLakeSnapshot(latestLakeSnapshotOfLake.id(), 
lakeSnapshotProperties);
     }
 
     @Nullable
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 49f80d999..b1b4adc9c 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -71,6 +71,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
 import static 
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
 import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
@@ -188,11 +189,18 @@ class PaimonTieringTest {
             // use snapshot id 0 as the known snapshot id
             CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(0L);
             assertThat(committedLakeSnapshot).isNotNull();
-            Map<Tuple2<Long, Integer>, Long> offsets = 
committedLakeSnapshot.getLogEndOffsets();
+            long tableId = tableInfo.getTableId();
+            Map<TableBucket, Long> offsets =
+                    fromLogOffsetProperty(
+                            tableId,
+                            committedLakeSnapshot
+                                    .getSnapshotProperties()
+                                    
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
             for (int bucket = 0; bucket < 3; bucket++) {
                 for (Long partitionId : partitionIdAndName.keySet()) {
                     // we only write 10 records, so expected log offset should 
be 10
-                    assertThat(offsets.get(Tuple2.of(partitionId, 
bucket))).isEqualTo(10);
+                    assertThat(offsets.get(new TableBucket(tableId, 
partitionId, bucket)))
+                            .isEqualTo(10);
                 }
             }
             assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();


Reply via email to