This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new de2d9a042 [lake] Move parse offsets from snapshot property to common
method (#2208)
de2d9a042 is described below
commit de2d9a042637386f9c8f59bddbff8a1a66853ca0
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Dec 22 16:17:43 2025 +0800
[lake] Move parse offsets from snapshot property to common method (#2208)
---
.../lake/committer/CommittedLakeSnapshot.java | 35 +++++----------
.../committer/FlussTableLakeSnapshotCommitter.java | 14 ++----
.../tiering/committer/TieringCommitOperator.java | 51 +++++++++++++++++++---
.../FlussTableLakeSnapshotCommitterTest.java | 19 +++-----
.../committer/TieringCommitOperatorTest.java | 51 +++++++++-------------
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 32 +-------------
.../lake/lance/tiering/LanceLakeCommitter.java | 29 ++----------
.../fluss/lake/lance/tiering/LanceTieringTest.java | 14 +++++-
.../lake/paimon/tiering/PaimonLakeCommitter.java | 26 +----------
.../lake/paimon/tiering/PaimonTieringTest.java | 12 ++++-
10 files changed, 116 insertions(+), 167 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
index 7b9e2f4cc..dfc67b2e4 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
@@ -17,41 +17,30 @@
package org.apache.fluss.lake.committer;
-import org.apache.fluss.utils.types.Tuple2;
-
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
- * The lake already committed snapshot, containing the lake snapshot id and
the bucket end offsets
- * in this snapshot.
+ * The lake already committed snapshot, containing the lake snapshot id and
the properties stored in
+ * this snapshot.
*/
public class CommittedLakeSnapshot {
private final long lakeSnapshotId;
- // <partition_id, bucket> -> log offset, partition_id will be null if it's
not a
- // partition bucket
- private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new
HashMap<>();
- public CommittedLakeSnapshot(long lakeSnapshotId) {
+ private final Map<String, String> snapshotProperties;
+
+ public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String>
snapshotProperties) {
this.lakeSnapshotId = lakeSnapshotId;
+ this.snapshotProperties = snapshotProperties;
}
public long getLakeSnapshotId() {
return lakeSnapshotId;
}
- public void addBucket(int bucketId, long offset) {
- logEndOffsets.put(Tuple2.of(null, bucketId), offset);
- }
-
- public void addPartitionBucket(Long partitionId, int bucketId, long
offset) {
- logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
- }
-
- public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
- return logEndOffsets;
+ public Map<String, String> getSnapshotProperties() {
+ return snapshotProperties;
}
@Override
@@ -64,12 +53,12 @@ public class CommittedLakeSnapshot {
}
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
return lakeSnapshotId == that.lakeSnapshotId
- && Objects.equals(logEndOffsets, that.logEndOffsets);
+ && Objects.equals(snapshotProperties, that.snapshotProperties);
}
@Override
public int hashCode() {
- return Objects.hash(lakeSnapshotId, logEndOffsets);
+ return Objects.hash(lakeSnapshotId, snapshotProperties);
}
@Override
@@ -77,8 +66,8 @@ public class CommittedLakeSnapshot {
return "CommittedLakeSnapshot{"
+ "lakeSnapshotId="
+ lakeSnapshotId
- + ", logEndOffsets="
- + logEndOffsets
+ + ", snapshotProperties="
+ + snapshotProperties
+ '}';
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index b78614dc4..f4be12593 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -20,7 +20,6 @@ package org.apache.fluss.flink.tiering.committer;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -31,7 +30,6 @@ import
org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.utils.ExceptionUtils;
-import org.apache.fluss.utils.types.Tuple2;
import java.io.IOException;
import java.util.Map;
@@ -75,17 +73,13 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
}
}
- public void commit(long tableId, CommittedLakeSnapshot
committedLakeSnapshot)
+ public void commit(long tableId, long snapshotId, Map<TableBucket, Long>
logEndOffsets)
throws IOException {
// construct lake snapshot to commit to Fluss
FlussTableLakeSnapshot flussTableLakeSnapshot =
- new FlussTableLakeSnapshot(tableId,
committedLakeSnapshot.getLakeSnapshotId());
- for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
- committedLakeSnapshot.getLogEndOffsets().entrySet()) {
- Tuple2<Long, Integer> partitionBucket = entry.getKey();
- Long partitionId = partitionBucket.f0;
- TableBucket tableBucket = new TableBucket(tableId, partitionId,
partitionBucket.f1);
- flussTableLakeSnapshot.addBucketOffset(tableBucket,
entry.getValue());
+ new FlussTableLakeSnapshot(tableId, snapshotId);
+ for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
+ flussTableLakeSnapshot.addBucketOffset(entry.getKey(),
entry.getValue());
}
commit(flussTableLakeSnapshot);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index a678a4b23..9d60c8899 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -34,10 +34,11 @@ import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
@@ -86,6 +87,8 @@ public class TieringCommitOperator<WriteResult, Committable>
implements OneInputStreamOperator<
TableBucketWriteResult<WriteResult>,
CommittableMessage<Committable>> {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final long serialVersionUID = 1L;
private final Configuration flussConfig;
@@ -223,6 +226,7 @@ public class TieringCommitOperator<WriteResult, Committable>
Committable committable =
lakeCommitter.toCommittable(writeResults);
// before commit to lake, check fluss not missing any lake
snapshot committed by fluss
checkFlussNotMissingLakeSnapshot(
+ tableId,
tablePath,
lakeCommitter,
committable,
@@ -303,6 +307,7 @@ public class TieringCommitOperator<WriteResult, Committable>
}
private void checkFlussNotMissingLakeSnapshot(
+ long tableId,
TablePath tablePath,
LakeCommitter<WriteResult, Committable> lakeCommitter,
Committable committable,
@@ -318,10 +323,33 @@ public class TieringCommitOperator<WriteResult,
Committable>
// known lake snapshot, which means the data already has been
committed to lake,
// not to commit to lake to avoid data duplicated
if (missingCommittedSnapshot != null) {
+ if (missingCommittedSnapshot.getSnapshotProperties() == null
+ || missingCommittedSnapshot
+ .getSnapshotProperties()
+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
+ == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Missing required log offsets property '%s' in
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
+ + "This property is required to commit
the missing snapshot to Fluss. "
+ + "The snapshot may have been created
by an older version of Fluss that did not store this information, "
+ + "or the snapshot properties may be
corrupted.",
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ missingCommittedSnapshot.getLakeSnapshotId(),
+ tablePath,
+ tableId));
+ }
+
+ String logOffsetsProperty =
+ missingCommittedSnapshot
+ .getSnapshotProperties()
+ .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
// commit this missing snapshot to fluss
- TableInfo tableInfo = admin.getTableInfo(tablePath).get();
flussTableLakeSnapshotCommitter.commit(
- tableInfo.getTableId(), missingCommittedSnapshot);
+ tableId,
+ missingCommittedSnapshot.getLakeSnapshotId(),
+ fromLogOffsetProperty(tableId, logOffsetsProperty));
// abort this committable to delete the written files
lakeCommitter.abort(committable);
throw new IllegalStateException(
@@ -331,12 +359,25 @@ public class TieringCommitOperator<WriteResult,
Committable>
+ " missing snapshot: %s.",
flussCurrentLakeSnapshot,
missingCommittedSnapshot.getLakeSnapshotId(),
- tableInfo.getTablePath(),
- tableInfo.getTableId(),
+ tablePath,
+ tableId,
missingCommittedSnapshot));
}
}
+ public static Map<TableBucket, Long> fromLogOffsetProperty(
+ long tableId, String logOffsetsProperty) throws IOException {
+ Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+ for (JsonNode node : OBJECT_MAPPER.readTree(logOffsetsProperty)) {
+ BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+ TableBucket tableBucket =
+ new TableBucket(
+ tableId, bucketOffset.getPartitionId(),
bucketOffset.getBucket());
+ logEndOffsets.put(tableBucket, bucketOffset.getLogOffset());
+ }
+ return logEndOffsets;
+ }
+
private void registerTableBucketWriteResult(
long tableId, TableBucketWriteResult<WriteResult>
tableBucketWriteResult) {
collectedTableBucketWriteResults
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 927d69e2b..f8d71e6cf 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -19,7 +19,6 @@ package org.apache.fluss.flink.tiering.committer;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.flink.utils.FlinkTestBase;
-import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
@@ -71,7 +70,6 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
List<String> partitions;
Map<String, Long> partitionNameAndIds = new HashMap<>();
- Map<Long, String> expectedPartitionNameById = new HashMap<>();
if (!isPartitioned) {
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
partitions = Collections.singletonList(null);
@@ -80,32 +78,27 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
partitions = new ArrayList<>(partitionNameAndIds.keySet());
}
- CommittedLakeSnapshot committedLakeSnapshot = new
CommittedLakeSnapshot(3);
-
- Map<TableBucket, Long> expectedOffsets = new HashMap<>();
+ Map<TableBucket, Long> logEndOffsets = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
long bucketOffset = bucket * bucket;
for (String partitionName : partitions) {
if (partitionName == null) {
- committedLakeSnapshot.addBucket(bucket, bucketOffset);
- expectedOffsets.put(new TableBucket(tableId, bucket),
bucketOffset);
+ logEndOffsets.put(new TableBucket(tableId, bucket),
bucketOffset);
} else {
long partitionId = partitionNameAndIds.get(partitionName);
- committedLakeSnapshot.addPartitionBucket(partitionId,
bucket, bucketOffset);
- expectedOffsets.put(
- new TableBucket(tableId, partitionId, bucket),
bucketOffset);
- expectedPartitionNameById.put(partitionId, partitionName);
+ logEndOffsets.put(new TableBucket(tableId, partitionId,
bucket), bucketOffset);
}
}
}
+ long snapshotId = 3;
// commit offsets
- flussTableLakeSnapshotCommitter.commit(tableId, committedLakeSnapshot);
+ flussTableLakeSnapshotCommitter.commit(tableId, snapshotId,
logEndOffsets);
LakeSnapshot lakeSnapshot =
admin.getLatestLakeSnapshot(tablePath).get();
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
// get and check the offsets
Map<TableBucket, Long> bucketLogOffsets =
lakeSnapshot.getTableBucketsOffset();
- assertThat(bucketLogOffsets).isEqualTo(expectedOffsets);
+ assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 230460395..0be879e9f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -28,7 +28,6 @@ import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.utils.types.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -50,12 +49,16 @@ import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
+import static
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -254,8 +257,12 @@ class TieringCommitOperatorTest extends FlinkTestBase {
@Test
void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
+ TablePath tablePath = TablePath.of("fluss",
"test_commit_when_fluss_missing_lake_snapshot");
+ long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
+ int numberOfWriteResults = 3;
+
CommittedLakeSnapshot mockCommittedSnapshot =
- mockCommittedLakeSnapshot(Collections.singletonList(null), 2);
+ mockCommittedLakeSnapshot(Collections.singletonList(null),
tableId, 2);
TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
new
TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
committerOperator =
@@ -266,10 +273,6 @@ class TieringCommitOperatorTest extends FlinkTestBase {
new TestingLakeTieringFactory(testingLakeCommitter));
committerOperator.open();
- TablePath tablePath = TablePath.of("fluss",
"test_commit_when_fluss_missing_lake_snapshot");
- long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
- int numberOfWriteResults = 3;
-
for (int bucket = 0; bucket < 3; bucket++) {
TableBucket tableBucket = new TableBucket(tableId, bucket);
committerOperator.processElement(
@@ -316,7 +319,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
CommittedLakeSnapshot mockCommittedSnapshot =
- mockCommittedLakeSnapshot(Collections.singletonList(null), 3);
+ mockCommittedLakeSnapshot(Collections.singletonList(null),
tableId, 3);
TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
new
TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
committerOperator =
@@ -359,36 +362,24 @@ class TieringCommitOperatorTest extends FlinkTestBase {
mockCommittedSnapshot));
}
- private CommittedLakeSnapshot mockCommittedLakeSnapshot(List<Long>
partitions, int snapshotId) {
- CommittedLakeSnapshot mockCommittedSnapshot = new
CommittedLakeSnapshot(snapshotId);
+ private CommittedLakeSnapshot mockCommittedLakeSnapshot(
+ List<Long> partitions, long tableId, int snapshotId) throws
IOException {
+ Map<TableBucket, Long> logEndOffsets = new HashMap<>();
for (Long partition : partitions) {
for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
- if (partition == null) {
- mockCommittedSnapshot.addBucket(bucket, bucket + 1);
- } else {
- mockCommittedSnapshot.addPartitionBucket(partition,
bucket, bucket + 1);
- }
+ logEndOffsets.put(new TableBucket(tableId, partition, bucket),
bucket + 1L);
}
}
- return mockCommittedSnapshot;
+ return new CommittedLakeSnapshot(snapshotId,
toBucketOffsetsProperty(logEndOffsets));
}
private Map<TableBucket, Long> getExpectedLogEndOffsets(
- long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
- Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
- for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
- committedLakeSnapshot.getLogEndOffsets().entrySet()) {
- Tuple2<Long, Integer> partitionBucket = entry.getKey();
- if (partitionBucket.f0 == null) {
- expectedLogEndOffsets.put(
- new TableBucket(tableId, partitionBucket.f1),
entry.getValue());
- } else {
- expectedLogEndOffsets.put(
- new TableBucket(tableId, partitionBucket.f0,
partitionBucket.f1),
- entry.getValue());
- }
- }
- return expectedLogEndOffsets;
+ long tableId, CommittedLakeSnapshot committedLakeSnapshot) throws
IOException {
+ return fromLogOffsetProperty(
+ tableId,
+ committedLakeSnapshot
+ .getSnapshotProperties()
+ .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
}
private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 935d06231..d07c9eb91 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -17,14 +17,10 @@
package org.apache.fluss.lake.iceberg.tiering;
-import org.apache.fluss.lake.committer.BucketOffset;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
import org.apache.fluss.metadata.TablePath;
-import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
@@ -52,7 +48,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
import static
org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -66,7 +61,6 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
private final Catalog icebergCatalog;
private final Table icebergTable;
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider,
TablePath tablePath)
@@ -222,7 +216,6 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
@Override
public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long
latestLakeSnapshotIdOfFluss)
throws IOException {
- // todo: may refactor to common methods?
Snapshot latestLakeSnapshot =
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
@@ -246,9 +239,6 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
}
}
- CommittedLakeSnapshot committedLakeSnapshot =
- new CommittedLakeSnapshot(latestLakeSnapshot.snapshotId());
-
// Reconstruct bucket offsets from snapshot properties
Map<String, String> properties = latestLakeSnapshot.summary();
if (properties == null) {
@@ -256,27 +246,7 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
"Failed to load committed lake snapshot properties from
Iceberg.");
}
- String flussOffsetProperties =
properties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
- if (flussOffsetProperties == null) {
- throw new IllegalArgumentException(
- "Cannot resume tiering from snapshot without bucket offset
properties. "
- + "The snapshot was committed to Iceberg but
missing Fluss metadata.");
- }
-
- for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
- BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
- if (bucketOffset.getPartitionId() != null) {
- committedLakeSnapshot.addPartitionBucket(
- bucketOffset.getPartitionId(),
- bucketOffset.getBucket(),
- bucketOffset.getLogOffset());
- } else {
- committedLakeSnapshot.addBucket(
- bucketOffset.getBucket(), bucketOffset.getLogOffset());
- }
- }
-
- return committedLakeSnapshot;
+ return new CommittedLakeSnapshot(latestLakeSnapshot.snapshotId(),
properties);
}
@Override
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
index 13cb112db..1aed7810e 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
@@ -18,15 +18,11 @@
package org.apache.fluss.lake.lance.tiering;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.committer.BucketOffset;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.lance.LanceConfig;
import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
import org.apache.fluss.metadata.TablePath;
-import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.fluss.utils.types.Tuple2;
import com.lancedb.lance.Dataset;
@@ -45,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static
org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
/** Implementation of {@link LakeCommitter} for Lance. */
@@ -53,7 +48,6 @@ public class LanceLakeCommitter implements
LakeCommitter<LanceWriteResult, Lance
private final LanceConfig config;
private static final String committerName = "commit-user";
private final RootAllocator allocator = new RootAllocator();
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public LanceLakeCommitter(Configuration options, TablePath tablePath) {
this.config =
@@ -109,26 +103,9 @@ public class LanceLakeCommitter implements
LakeCommitter<LanceWriteResult, Lance
return null;
}
- CommittedLakeSnapshot committedLakeSnapshot =
- new
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.f0.getId());
- String flussOffsetProperties =
- latestLakeSnapshotIdOfLake
- .f1
- .transactionProperties()
- .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
- for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
- BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
- if (bucketOffset.getPartitionId() != null) {
- committedLakeSnapshot.addPartitionBucket(
- bucketOffset.getPartitionId(),
- bucketOffset.getBucket(),
- bucketOffset.getLogOffset());
- } else {
- committedLakeSnapshot.addBucket(
- bucketOffset.getBucket(), bucketOffset.getLogOffset());
- }
- }
- return committedLakeSnapshot;
+ return new CommittedLakeSnapshot(
+ latestLakeSnapshotIdOfLake.f0.getId(),
+ latestLakeSnapshotIdOfLake.f1.transactionProperties());
}
@Nullable
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index 1157daf76..1cfac3723 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -64,7 +64,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
import static
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static org.assertj.core.api.Assertions.assertThat;
/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
@@ -201,11 +203,19 @@ class LanceTieringTest {
// use snapshot id 1 as the known snapshot id
CommittedLakeSnapshot committedLakeSnapshot =
lakeCommitter.getMissingLakeSnapshot(1L);
assertThat(committedLakeSnapshot).isNotNull();
- Map<Tuple2<Long, Integer>, Long> offsets =
committedLakeSnapshot.getLogEndOffsets();
+ long tableId = tableInfo.getTableId();
+ Map<TableBucket, Long> offsets =
+ fromLogOffsetProperty(
+ tableInfo.getTableId(),
+ committedLakeSnapshot
+ .getSnapshotProperties()
+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+
for (int bucket = 0; bucket < 3; bucket++) {
for (Long partitionId : partitionIdAndName.keySet()) {
// we only write 10 records, so expected log offset should
be 10
- assertThat(offsets.get(Tuple2.of(partitionId,
bucket))).isEqualTo(10);
+ assertThat(offsets.get(new TableBucket(tableId,
partitionId, bucket)))
+ .isEqualTo(10);
}
}
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isEqualTo(2L);
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index ee0419312..72790bdcb 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -18,14 +18,10 @@
package org.apache.fluss.lake.paimon.tiering;
import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.lake.committer.BucketOffset;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.metadata.TablePath;
-import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
@@ -46,7 +42,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -60,7 +55,6 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private TableCommitImpl tableCommit;
private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public PaimonLakeCommitter(
PaimonCatalogProvider paimonCatalogProvider, CommitterInitContext
committerInitContext)
@@ -135,9 +129,6 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
return null;
}
- CommittedLakeSnapshot committedLakeSnapshot =
- new CommittedLakeSnapshot(latestLakeSnapshotOfLake.id());
-
if (latestLakeSnapshotOfLake.properties() == null) {
throw new IOException("Failed to load committed lake snapshot
properties from Paimon.");
}
@@ -155,23 +146,8 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
+ "To resolve this:\n"
+ "1. Run the old tiering service(v0.7) again to
complete the Fluss commit\n"
+ "2. Then you can resume tiering with the newer
version of tiering service");
- } else {
- String flussOffsetProperties =
-
lakeSnapshotProperties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
- for (JsonNode node :
OBJECT_MAPPER.readTree(flussOffsetProperties)) {
- BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
- if (bucketOffset.getPartitionId() != null) {
- committedLakeSnapshot.addPartitionBucket(
- bucketOffset.getPartitionId(),
- bucketOffset.getBucket(),
- bucketOffset.getLogOffset());
- } else {
- committedLakeSnapshot.addBucket(
- bucketOffset.getBucket(),
bucketOffset.getLogOffset());
- }
- }
}
- return committedLakeSnapshot;
+ return new CommittedLakeSnapshot(latestLakeSnapshotOfLake.id(),
lakeSnapshotProperties);
}
@Nullable
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 49f80d999..b1b4adc9c 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -71,6 +71,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
import static
org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
@@ -188,11 +189,18 @@ class PaimonTieringTest {
// use snapshot id 0 as the known snapshot id
CommittedLakeSnapshot committedLakeSnapshot =
lakeCommitter.getMissingLakeSnapshot(0L);
assertThat(committedLakeSnapshot).isNotNull();
- Map<Tuple2<Long, Integer>, Long> offsets =
committedLakeSnapshot.getLogEndOffsets();
+ long tableId = tableInfo.getTableId();
+ Map<TableBucket, Long> offsets =
+ fromLogOffsetProperty(
+ tableId,
+ committedLakeSnapshot
+ .getSnapshotProperties()
+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
for (int bucket = 0; bucket < 3; bucket++) {
for (Long partitionId : partitionIdAndName.keySet()) {
// we only write 10 records, so expected log offset should
be 10
- assertThat(offsets.get(Tuple2.of(partitionId,
bucket))).isEqualTo(10);
+ assertThat(offsets.get(new TableBucket(tableId,
partitionId, bucket)))
+ .isEqualTo(10);
}
}
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();