luoyuxia commented on code in PR #1700:
URL: https://github.com/apache/fluss/pull/1700#discussion_r2352037202
##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java:
##########
@@ -22,23 +22,29 @@
import java.io.Serializable;
import java.util.Objects;
-/** The bucket offset information to be expected to be stored in Lake's
snapshot property. */
+/**
+ * The bucket offset and timestamp information to be expected to be stored in
Lake's snapshot
+ * property.
+ */
public class BucketOffset implements Serializable {
private static final long serialVersionUID = 1L;
public static final String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY =
"fluss-offsets";
private final long logOffset;
+ private final long timestamp;
Review Comment:
we also don't need to store timestamp in `BucketOffset` currently.
##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java:
##########
@@ -22,17 +22,21 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
- * The lake already committed snapshot, containing the lake snapshot id and
the bucket end offsets
- * in this snapshot.
+ * The lake already committed snapshot, containing the lake snapshot id, the
bucket end offsets and
+ * max timestamp in this snapshot.
*/
public class CommittedLakeSnapshot {
private final long lakeSnapshotId;
// <partition_id, bucket> -> log offset, partition_id will be null if it's
not a
// partition bucket
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new
HashMap<>();
+ // <partition_id, bucket> -> max timestamp, partition_id will be null if
it's not a
+ // partition bucket
+ private final Map<Tuple2<Long, Integer>, Long> maxTimestamps = new
HashMap<>();
Review Comment:
we can remove `maxTimestamps`
##########
fluss-client/src/main/java/org/apache/fluss/client/metadata/LakeSnapshot.java:
##########
@@ -37,16 +37,21 @@ public class LakeSnapshot {
// the specific log offset of the snapshot
private final Map<TableBucket, Long> tableBucketsOffset;
+ // the specific timestamp of the snapshot
+ private final Map<TableBucket, Long> tableBucketsTimestamp;
Review Comment:
currently, we don't need to expose timestamp in here.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -114,13 +120,14 @@ private CommitLakeTableSnapshotRequest
toCommitLakeTableSnapshotRequest(
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
- for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry
:
- flussTableLakeSnapshot.logEndOffsets().entrySet()) {
+ for (Tuple2<TableBucket, String> bucketPartition :
+ flussTableLakeSnapshot.tablePartitionBuckets()) {
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
pbLakeTableSnapshotInfo.addBucketsReq();
- TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
- String partitionName = bucketEndOffsetEntry.getKey().f1;
- long endOffset = bucketEndOffsetEntry.getValue();
+ TableBucket tableBucket = bucketPartition.f0;
+ String partitionName = bucketPartition.f1;
+ long endOffset =
flussTableLakeSnapshot.getLogEndOffset(bucketPartition);
+ long maxTimestamp =
flussTableLakeSnapshot.getMaxTimestamp(bucketPartition);
Review Comment:
Yes, we just need to to timestamp in here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]