This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ece2fe34e [lake] Lake tiering service report the max timestamp of
fluss record (#1700)
ece2fe34e is described below
commit ece2fe34e1f6a672b931486cef2e88be7992ac0c
Author: Liebing <[email protected]>
AuthorDate: Wed Sep 17 20:20:50 2025 +0800
[lake] Lake tiering service report the max timestamp of fluss record (#1700)
---
.../tiering/committer/FlussTableLakeSnapshot.java | 27 ++++-
.../committer/FlussTableLakeSnapshotCommitter.java | 26 ++--
.../tiering/committer/TieringCommitOperator.java | 10 +-
.../tiering/source/TableBucketWriteResult.java | 12 +-
.../source/TableBucketWriteResultSerializer.java | 6 +
.../flink/tiering/source/TieringSplitReader.java | 29 ++++-
.../committer/TieringCommitOperatorTest.java | 131 +++++++++++++++++----
.../TableBucketWriteResultSerializerTest.java | 5 +-
fluss-rpc/src/main/proto/FlussApi.proto | 2 +
.../fluss/server/entity/LakeBucketOffset.java | 9 +-
.../org/apache/fluss/server/log/LogTablet.java | 11 ++
.../fluss/server/replica/ReplicaManager.java | 8 ++
.../fluss/server/utils/ServerRpcMessageUtils.java | 14 ++-
.../apache/fluss/server/zk/ZooKeeperClient.java | 6 +
.../fluss/server/zk/data/LakeTableSnapshot.java | 17 ++-
.../server/zk/data/LakeTableSnapshotJsonSerde.java | 14 +++
.../replica/CommitLakeTableSnapshotITCase.java | 16 ++-
.../replica/NotifyReplicaLakeTableOffsetTest.java | 28 +++--
.../zk/data/LakeTableSnapshotJsonSerdeTest.java | 18 ++-
19 files changed, 328 insertions(+), 61 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
index b0ddc062a..510949902 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
@@ -22,6 +22,7 @@ import org.apache.fluss.utils.types.Tuple2;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
/** A lake snapshot for a Fluss table. */
class FlussTableLakeSnapshot {
@@ -34,10 +35,15 @@ class FlussTableLakeSnapshot {
// if the bucket is not of a partition, the partition_name is null
private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
+ // <table_bucket, partition_name> -> max timestamps,
+ // if the bucket is not of a partition, the partition_name is null
+ private final Map<Tuple2<TableBucket, String>, Long> maxTimestamps;
+
FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
this.tableId = tableId;
this.lakeSnapshotId = lakeSnapshotId;
this.logEndOffsets = new HashMap<>();
+ this.maxTimestamps = new HashMap<>();
}
public long tableId() {
@@ -48,16 +54,27 @@ class FlussTableLakeSnapshot {
return lakeSnapshotId;
}
- public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
- return logEndOffsets;
+ public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
+ return logEndOffsets.keySet();
}
- public void addBucketOffset(TableBucket bucket, long offset) {
+ public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset,
long timestamp) {
logEndOffsets.put(Tuple2.of(bucket, null), offset);
+ maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
}
- public void addPartitionBucketOffset(TableBucket bucket, String
partitionName, long offset) {
+ public void addPartitionBucketOffsetAndTimestamp(
+ TableBucket bucket, String partitionName, long offset, long
timestamp) {
logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
+ maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
+ }
+
+ public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
+ return logEndOffsets.get(bucketPartition);
+ }
+
+ public long getMaxTimestamp(Tuple2<TableBucket, String> bucketPartition) {
+ return maxTimestamps.get(bucketPartition);
}
@Override
@@ -69,6 +86,8 @@ class FlussTableLakeSnapshot {
+ lakeSnapshotId
+ ", logEndOffsets="
+ logEndOffsets
+ + ", maxTimestamps="
+ + maxTimestamps
+ '}';
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index a0adc775b..a6cc47767 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -88,7 +88,12 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
Long partitionId = partitionBucket.f0;
if (partitionId == null) {
tableBucket = new TableBucket(tableId, partitionBucket.f1);
- flussTableLakeSnapshot.addBucketOffset(tableBucket,
entry.getValue());
+ // we use -1 since we don't store timestamp in lake snapshot
property for
+ // simplicity, it may cause the timestamp to be -1 during
constructing lake
+ // snapshot to commit to Fluss.
+ // But it should happen rarely and should be a normal value
after next tiering.
+ flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
+ tableBucket, entry.getValue(), -1);
} else {
tableBucket = new TableBucket(tableId, partitionId,
partitionBucket.f1);
// the partition name is qualified partition name in format:
@@ -98,8 +103,11 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
ResolvedPartitionSpec resolvedPartitionSpec =
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
- flussTableLakeSnapshot.addPartitionBucketOffset(
- tableBucket, resolvedPartitionSpec.getPartitionName(),
entry.getValue());
+ flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
+ tableBucket,
+ resolvedPartitionSpec.getPartitionName(),
+ entry.getValue(),
+ -1);
}
}
commit(flussTableLakeSnapshot);
@@ -114,13 +122,14 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
- for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry
:
- flussTableLakeSnapshot.logEndOffsets().entrySet()) {
+ for (Tuple2<TableBucket, String> bucketPartition :
+ flussTableLakeSnapshot.tablePartitionBuckets()) {
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
pbLakeTableSnapshotInfo.addBucketsReq();
- TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
- String partitionName = bucketEndOffsetEntry.getKey().f1;
- long endOffset = bucketEndOffsetEntry.getValue();
+ TableBucket tableBucket = bucketPartition.f0;
+ String partitionName = bucketPartition.f1;
+ long endOffset =
flussTableLakeSnapshot.getLogEndOffset(bucketPartition);
+ long maxTimestamp =
flussTableLakeSnapshot.getMaxTimestamp(bucketPartition);
if (tableBucket.getPartitionId() != null) {
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
}
@@ -129,6 +138,7 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
}
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
+ pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
}
return commitLakeTableSnapshotRequest;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 282d12d80..779ca8913 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -229,10 +229,14 @@ public class TieringCommitOperator<WriteResult,
Committable>
for (TableBucketWriteResult<WriteResult> writeResult :
committableWriteResults) {
TableBucket tableBucket = writeResult.tableBucket();
if (writeResult.tableBucket().getPartitionId() == null) {
- flussTableLakeSnapshot.addBucketOffset(tableBucket,
writeResult.logEndOffset());
+ flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
+ tableBucket, writeResult.logEndOffset(),
writeResult.maxTimestamp());
} else {
- flussTableLakeSnapshot.addPartitionBucketOffset(
- tableBucket, writeResult.partitionName(),
writeResult.logEndOffset());
+
flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
+ tableBucket,
+ writeResult.partitionName(),
+ writeResult.logEndOffset(),
+ writeResult.maxTimestamp());
}
}
flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
index 7249cc4e5..ba648f297 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
@@ -42,12 +42,16 @@ public class TableBucketWriteResult<WriteResult> implements
Serializable {
// null when the bucket is not for a partition
@Nullable private final String partitionName;
- // will be null when no any data write, such as for tiering a empty log
split
+ // will be null when no any data write, such as for tiering an empty log
split
@Nullable private final WriteResult writeResult;
// the end offset of tiering, should be the last tiered record's offset + 1
private final long logEndOffset;
+ // the max timestamp of tiering, should be the last tiered record's
timestamp,
+ // will be -1 for empty log splits or snapshot splits
+ private final long maxTimestamp;
+
// the total number of write results in one round of tiering,
// used for downstream commiter operator to determine when all write
results
// for the round of tiering is finished
@@ -59,12 +63,14 @@ public class TableBucketWriteResult<WriteResult> implements
Serializable {
@Nullable String partitionName,
@Nullable WriteResult writeResult,
long logEndOffset,
+ long maxTimestamp,
int numberOfWriteResults) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.partitionName = partitionName;
this.writeResult = writeResult;
this.logEndOffset = logEndOffset;
+ this.maxTimestamp = maxTimestamp;
this.numberOfWriteResults = numberOfWriteResults;
}
@@ -93,4 +99,8 @@ public class TableBucketWriteResult<WriteResult> implements
Serializable {
public long logEndOffset() {
return logEndOffset;
}
+
+ public long maxTimestamp() {
+ return maxTimestamp;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
index 37412b472..365176095 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
@@ -85,6 +85,9 @@ public class TableBucketWriteResultSerializer<WriteResult>
// serialize log end offset
out.writeLong(tableBucketWriteResult.logEndOffset());
+ // serialize max timestamp
+ out.writeLong(tableBucketWriteResult.maxTimestamp());
+
// serialize number of write results
out.writeInt(tableBucketWriteResult.numberOfWriteResults());
@@ -129,6 +132,8 @@ public class TableBucketWriteResultSerializer<WriteResult>
// deserialize log end offset
long logEndOffset = in.readLong();
+ // deserialize max timestamp
+ long maxTimestamp = in.readLong();
// deserialize number of write results
int numberOfWriteResults = in.readInt();
return new TableBucketWriteResult<>(
@@ -137,6 +142,7 @@ public class TableBucketWriteResultSerializer<WriteResult>
partitionName,
writeResult,
logEndOffset,
+ maxTimestamp,
numberOfWriteResults);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index 80e2e4c6f..bcee1861d 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -68,6 +68,9 @@ public class TieringSplitReader<WriteResult>
private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L);
+ // unknown bucket timestamp for empty split or snapshot split
+ private static final long UNKNOWN_BUCKET_TIMESTAMP = -1;
+
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
// the id for the pending tables to be tiered
@@ -285,7 +288,10 @@ public class TieringSplitReader<WriteResult>
writeResults.put(
bucket,
completeLakeWriter(
- bucket,
currentTieringSplit.getPartitionName(), stoppingOffset));
+ bucket,
+ currentTieringSplit.getPartitionName(),
+ stoppingOffset,
+ lastRecord.timestamp()));
// put split of the bucket
finishedSplitIds.put(bucket, currentSplitId);
LOG.info("Split {} has been finished.", currentSplitId);
@@ -316,7 +322,10 @@ public class TieringSplitReader<WriteResult>
}
private TableBucketWriteResult<WriteResult> completeLakeWriter(
- TableBucket bucket, @Nullable String partitionName, long
logEndOffset)
+ TableBucket bucket,
+ @Nullable String partitionName,
+ long logEndOffset,
+ long maxTimestamp)
throws IOException {
LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
WriteResult writeResult = lakeWriter.complete();
@@ -327,6 +336,7 @@ public class TieringSplitReader<WriteResult>
partitionName,
writeResult,
logEndOffset,
+ maxTimestamp,
checkNotNull(currentTableNumberOfSplits));
}
@@ -344,6 +354,7 @@ public class TieringSplitReader<WriteResult>
logSplit.getPartitionName(),
null,
logSplit.getStoppingOffset(),
+ UNKNOWN_BUCKET_TIMESTAMP,
logSplit.getNumberOfSplits()));
}
return new TableBucketWriteResultWithSplitIds(writeResults,
finishedSplitIds);
@@ -363,7 +374,10 @@ public class TieringSplitReader<WriteResult>
String splitId =
currentTableSplitsByBucket.remove(tableBucket).splitId();
TableBucketWriteResult<WriteResult> writeResult =
completeLakeWriter(
- tableBucket, currentSnapshotSplit.getPartitionName(),
logEndOffset);
+ tableBucket,
+ currentSnapshotSplit.getPartitionName(),
+ logEndOffset,
+ UNKNOWN_BUCKET_TIMESTAMP);
closeCurrentSnapshotSplit();
mayFinishCurrentTable();
return new TableBucketWriteResultWithSplitIds(
@@ -483,9 +497,16 @@ public class TieringSplitReader<WriteResult>
@Nullable String partitionName,
@Nullable WriteResult writeResult,
long endLogOffset,
+ long maxTimestamp,
int numberOfSplits) {
return new TableBucketWriteResult<>(
- tablePath, tableBucket, partitionName, writeResult,
endLogOffset, numberOfSplits);
+ tablePath,
+ tableBucket,
+ partitionName,
+ writeResult,
+ endLogOffset,
+ maxTimestamp,
+ numberOfSplits);
}
private class TableBucketWriteResultWithSplitIds
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index e6c412cad..6f0ab20f4 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -29,6 +29,8 @@ import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LakeTableSnapshot;
import org.apache.fluss.utils.types.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -56,6 +58,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
import static org.assertj.core.api.Assertions.assertThat;
@@ -108,61 +111,77 @@ class TieringCommitOperatorTest extends FlinkTestBase {
TableBucket t1b0 = new TableBucket(tableId1, 0);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath1, t1b0, 1, 11, numberOfWriteResults));
+ tablePath1, t1b0, 1, 11, 21L, numberOfWriteResults));
// table1, bucket 1
TableBucket t1b1 = new TableBucket(tableId1, 1);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath1, t1b1, 2, 12, numberOfWriteResults));
+ tablePath1, t1b1, 2, 12, 22L, numberOfWriteResults));
verifyNoLakeSnapshot(tablePath1);
// table2, bucket0
TableBucket t2b0 = new TableBucket(tableId2, 0);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath2, t2b0, 1, 21, numberOfWriteResults));
+ tablePath2, t2b0, 1, 21, 31L, numberOfWriteResults));
// table2, bucket1
TableBucket t2b1 = new TableBucket(tableId2, 1);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath2, t2b1, 2, 22, numberOfWriteResults));
+ tablePath2, t2b1, 2, 22, 32L, numberOfWriteResults));
verifyNoLakeSnapshot(tablePath2);
// add table1, bucket2
TableBucket t1b2 = new TableBucket(tableId1, 2);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath1, t1b2, 3, 13, numberOfWriteResults));
+ tablePath1, t1b2, 3, 13, 23L, numberOfWriteResults));
// verify lake snapshot
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
expectedLogEndOffsets.put(t1b0, 11L);
expectedLogEndOffsets.put(t1b1, 12L);
expectedLogEndOffsets.put(t1b2, 13L);
- verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets);
+ Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
+ expectedMaxTimestamps.put(t1b0, 21L);
+ expectedMaxTimestamps.put(t1b1, 22L);
+ expectedMaxTimestamps.put(t1b2, 23L);
+ verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets,
expectedMaxTimestamps);
// add table2, bucket2
TableBucket t2b2 = new TableBucket(tableId2, 2);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath2, t2b2, 4, 23, numberOfWriteResults));
+ tablePath2, t2b2, 4, 23, 33L, numberOfWriteResults));
expectedLogEndOffsets = new HashMap<>();
expectedLogEndOffsets.put(t2b0, 21L);
expectedLogEndOffsets.put(t2b1, 22L);
expectedLogEndOffsets.put(t2b2, 23L);
- verifyLakeSnapshot(tablePath2, tableId2, 1, expectedLogEndOffsets);
+ expectedMaxTimestamps = new HashMap<>();
+ expectedMaxTimestamps.put(t2b0, 31L);
+ expectedMaxTimestamps.put(t2b1, 32L);
+ expectedMaxTimestamps.put(t2b2, 33L);
+ verifyLakeSnapshot(tablePath2, tableId2, 1, expectedLogEndOffsets,
expectedMaxTimestamps);
// let's process one round of TableBucketWriteResult again
expectedLogEndOffsets = new HashMap<>();
+ expectedMaxTimestamps = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
TableBucket tableBucket = new TableBucket(tableId1, bucket);
long offset = bucket * bucket;
+ long timestamp = bucket * bucket;
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath1, tableBucket, bucket, offset,
numberOfWriteResults));
+ tablePath1,
+ tableBucket,
+ bucket,
+ offset,
+ timestamp,
+ numberOfWriteResults));
expectedLogEndOffsets.put(tableBucket, offset);
+ expectedMaxTimestamps.put(tableBucket, timestamp);
}
- verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets);
+ verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets,
expectedMaxTimestamps);
}
@Test
@@ -172,13 +191,16 @@ class TieringCommitOperatorTest extends FlinkTestBase {
Map<String, Long> partitionIdByNames =
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
+ Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
int numberOfWriteResults = 3 * partitionIdByNames.size();
long offset = 0;
+ long timestamp = System.currentTimeMillis();
for (int bucket = 0; bucket < 3; bucket++) {
for (Map.Entry<String, Long> partitionIdAndNameEntry :
partitionIdByNames.entrySet()) {
long partitionId = partitionIdAndNameEntry.getValue();
TableBucket tableBucket = new TableBucket(tableId,
partitionId, bucket);
long currentOffset = offset++;
+ long currentTimestamp = timestamp++;
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
tablePath,
@@ -186,11 +208,14 @@ class TieringCommitOperatorTest extends FlinkTestBase {
partitionIdAndNameEntry.getKey(),
1,
currentOffset,
+ currentTimestamp,
numberOfWriteResults));
expectedLogEndOffsets.put(tableBucket, currentOffset);
+ expectedMaxTimestamps.put(tableBucket, currentTimestamp);
}
if (bucket == 2) {
- verifyLakeSnapshot(tablePath, tableId, 1,
expectedLogEndOffsets);
+ verifyLakeSnapshot(
+ tablePath, tableId, 1, expectedLogEndOffsets,
expectedMaxTimestamps);
} else {
verifyNoLakeSnapshot(tablePath);
}
@@ -208,7 +233,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
TableBucket tableBucket = new TableBucket(tableId, bucket);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath1, tableBucket, null, 3,
numberOfWriteResults));
+ tablePath1, tableBucket, null, 3, 6L,
numberOfWriteResults));
}
verifyNoLakeSnapshot(tablePath1);
@@ -223,16 +248,25 @@ class TieringCommitOperatorTest extends FlinkTestBase {
bucket,
// just use bucket as log offset
bucket,
+ bucket,
numberOfWriteResults));
}
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath1, new TableBucket(tableId, 0), null, 3,
numberOfWriteResults));
+ tablePath1,
+ new TableBucket(tableId, 0),
+ null,
+ 3,
+ 6L,
+ numberOfWriteResults));
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
expectedLogEndOffsets.put(new TableBucket(tableId, 1), 1L);
expectedLogEndOffsets.put(new TableBucket(tableId, 2), 2L);
- verifyLakeSnapshot(tablePath1, tableId, 1, expectedLogEndOffsets);
+ Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
+ expectedMaxTimestamps.put(new TableBucket(tableId, 1), 1L);
+ expectedMaxTimestamps.put(new TableBucket(tableId, 2), 2L);
+ verifyLakeSnapshot(tablePath1, tableId, 1, expectedLogEndOffsets,
expectedMaxTimestamps);
}
@Test
@@ -256,7 +290,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
TableBucket tableBucket = new TableBucket(tableId, bucket);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath, tableBucket, 3, 3,
numberOfWriteResults));
+ tablePath, tableBucket, 3, 3, 3L,
numberOfWriteResults));
}
verifyLakeSnapshot(
@@ -264,6 +298,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
tableId,
2,
getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+ getExpectedMaxTimestamps(tableId, mockCommittedSnapshot),
mockCommittedSnapshot.getQualifiedPartitionNameById(),
String.format(
"The current Fluss's lake snapshot %s is less than
lake actual snapshot %d committed by Fluss for table: {tablePath=%s,
tableId=%d},"
@@ -275,16 +310,19 @@ class TieringCommitOperatorTest extends FlinkTestBase {
mockCommittedSnapshot));
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
+ Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
TableBucket tableBucket = new TableBucket(tableId, bucket);
long offset = bucket * bucket;
+ long timestamp = bucket * bucket;
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath, tableBucket, 3, offset,
numberOfWriteResults));
+ tablePath, tableBucket, 3, offset, timestamp,
numberOfWriteResults));
expectedLogEndOffsets.put(tableBucket, offset);
+ expectedMaxTimestamps.put(tableBucket, timestamp);
}
- verifyLakeSnapshot(tablePath, tableId, 3, expectedLogEndOffsets);
+ verifyLakeSnapshot(tablePath, tableId, 3, expectedLogEndOffsets,
expectedMaxTimestamps);
}
@Test
@@ -316,7 +354,13 @@ class TieringCommitOperatorTest extends FlinkTestBase {
TableBucket tableBucket = new TableBucket(tableId,
partitionId, bucket);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath, tableBucket, partitionName, 3, 3,
numberOfWriteResults));
+ tablePath,
+ tableBucket,
+ partitionName,
+ 3,
+ 3,
+ 3L,
+ numberOfWriteResults));
}
}
@@ -325,6 +369,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
tableId,
3,
getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+ getExpectedMaxTimestamps(tableId, mockCommittedSnapshot),
mockCommittedSnapshot.getQualifiedPartitionNameById(),
String.format(
"The current Fluss's lake snapshot %s is less than
lake actual snapshot %d committed by Fluss for table: {tablePath=%s,
tableId=%d}, missing snapshot: %s.",
@@ -373,15 +418,38 @@ class TieringCommitOperatorTest extends FlinkTestBase {
return expectedLogEndOffsets;
}
+ private Map<TableBucket, Long> getExpectedMaxTimestamps(
+ long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
+ Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
+ for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
+ committedLakeSnapshot.getLogEndOffsets().entrySet()) {
+ Tuple2<Long, Integer> partitionBucket = entry.getKey();
+ if (partitionBucket.f0 == null) {
+ expectedMaxTimestamps.put(new TableBucket(tableId,
partitionBucket.f1), -1L);
+ } else {
+ expectedMaxTimestamps.put(
+ new TableBucket(tableId, partitionBucket.f0,
partitionBucket.f1), -1L);
+ }
+ }
+ return expectedMaxTimestamps;
+ }
+
private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
createTableBucketWriteResultStreamRecord(
TablePath tablePath,
TableBucket tableBucket,
@Nullable Integer writeResult,
long logEndOffset,
+ long maxTimestamp,
int numberOfWriteResults) {
return createTableBucketWriteResultStreamRecord(
- tablePath, tableBucket, null, writeResult, logEndOffset,
numberOfWriteResults);
+ tablePath,
+ tableBucket,
+ null,
+ writeResult,
+ logEndOffset,
+ maxTimestamp,
+ numberOfWriteResults);
}
private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
@@ -391,6 +459,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
@Nullable String partitionName,
@Nullable Integer writeResult,
long logEndOffset,
+ long maxTimestamp,
int numberOfWriteResults) {
TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
new TableBucketWriteResult<>(
@@ -399,6 +468,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
partitionName,
writeResult == null ? null : new
TestingWriteResult(writeResult),
logEndOffset,
+ maxTimestamp,
numberOfWriteResults);
return new StreamRecord<>(tableBucketWriteResult);
}
@@ -413,13 +483,17 @@ class TieringCommitOperatorTest extends FlinkTestBase {
TablePath tablePath,
long tableId,
long expectedSnapshotId,
- Map<TableBucket, Long> expectedLogEndOffsets)
+ Map<TableBucket, Long> expectedLogEndOffsets,
+ Map<TableBucket, Long> expectedMaxTimestamp)
throws Exception {
LakeSnapshot lakeSnapshot =
admin.getLatestLakeSnapshot(tablePath).get();
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(expectedSnapshotId);
assertThat(lakeSnapshot.getTableBucketsOffset()).isEqualTo(expectedLogEndOffsets);
- // check the tableId has been send to mark finished
+ // TODO: replace with LakeSnapshot when support max timestamp
+ verifyLakeSnapshotMaxTimestamp(tableId, expectedMaxTimestamp);
+
+ // check the tableId has been sent to mark finished
List<OperatorEvent> operatorEvents =
mockOperatorEventGateway.getEventsSent();
SourceEventWrapper sourceEventWrapper =
(SourceEventWrapper) operatorEvents.get(operatorEvents.size()
- 1);
@@ -433,6 +507,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
long tableId,
long expectedSnapshotId,
Map<TableBucket, Long> expectedLogEndOffsets,
+ Map<TableBucket, Long> expectedMaxTimestamp,
Map<Long, String> expectedPartitionIdByName,
String failedReason)
throws Exception {
@@ -441,7 +516,10 @@ class TieringCommitOperatorTest extends FlinkTestBase {
assertThat(lakeSnapshot.getTableBucketsOffset()).isEqualTo(expectedLogEndOffsets);
assertThat(lakeSnapshot.getPartitionNameById()).isEqualTo(expectedPartitionIdByName);
- // check the tableId has been send to mark failed
+ // TODO: replace with LakeSnapshot when support max timestamp
+ verifyLakeSnapshotMaxTimestamp(tableId, expectedMaxTimestamp);
+
+ // check the tableId has been sent to mark failed
List<OperatorEvent> operatorEvents =
mockOperatorEventGateway.getEventsSent();
SourceEventWrapper sourceEventWrapper =
(SourceEventWrapper) operatorEvents.get(operatorEvents.size()
- 1);
@@ -451,6 +529,15 @@ class TieringCommitOperatorTest extends FlinkTestBase {
assertThat(finishTieringEvent.failReason()).contains(failedReason);
}
+ private void verifyLakeSnapshotMaxTimestamp(
+ long tableId, Map<TableBucket, Long> expectedMaxTimestamp) throws
Exception {
+ ZooKeeperClient zkClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ Optional<LakeTableSnapshot> lakeTableSnapshotOpt =
zkClient.getLakeTableSnapshot(tableId);
+ assertThat(lakeTableSnapshotOpt).isNotEmpty();
+ LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotOpt.get();
+
assertThat(lakeTableSnapshot.getBucketMaxTimestamp()).isEqualTo(expectedMaxTimestamp);
+ }
+
private static class MockOperatorEventDispatcher implements
OperatorEventDispatcher {
private final OperatorEventGateway operatorEventGateway;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
index 2c718cbae..dbb40eae1 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
@@ -44,7 +44,7 @@ class TableBucketWriteResultSerializerTest {
String partitionName = isPartitioned ? "partition1" : null;
TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
new TableBucketWriteResult<>(
- tablePath, tableBucket, partitionName,
testingWriteResult, 10, 20);
+ tablePath, tableBucket, partitionName,
testingWriteResult, 10, 30L, 20);
// test serialize and deserialize
byte[] serialized =
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
@@ -63,7 +63,8 @@ class TableBucketWriteResultSerializerTest {
// verify when writeResult is null
tableBucketWriteResult =
- new TableBucketWriteResult<>(tablePath, tableBucket,
partitionName, null, 20, 30);
+ new TableBucketWriteResult<>(
+ tablePath, tableBucket, partitionName, null, 20, 30L,
30);
serialized =
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
deserialized =
tableBucketWriteResultSerializer.deserialize(
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index 2ddaa44e8..19e401095 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -449,6 +449,7 @@ message PbLakeTableOffsetForBucket {
optional int64 log_start_offset = 3;
optional int64 log_end_offset = 4;
optional string partition_name = 5;
+ optional int64 max_timestamp = 6;
}
message CommitLakeTableSnapshotResponse {
@@ -473,6 +474,7 @@ message PbNotifyLakeTableOffsetReqForBucket {
required int64 snapshot_id = 4;
optional int64 log_start_offset = 5;
optional int64 log_end_offset = 6;
+ optional int64 max_timestamp = 7;
}
message NotifyLakeTableOffsetResponse {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
b/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
index 9ccc46a9e..5692245a4 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java
@@ -25,11 +25,14 @@ public class LakeBucketOffset {
private final Long logStartOffset;
private final Long logEndOffset;
+ private final Long maxTimestamp;
- public LakeBucketOffset(long snapshotId, Long logStartOffset, Long
logEndOffset) {
+ public LakeBucketOffset(
+ long snapshotId, Long logStartOffset, Long logEndOffset, Long
maxTimestamp) {
this.snapshotId = snapshotId;
this.logStartOffset = logStartOffset;
this.logEndOffset = logEndOffset;
+ this.maxTimestamp = maxTimestamp;
}
public long getSnapshotId() {
@@ -43,4 +46,8 @@ public class LakeBucketOffset {
public Optional<Long> getLogEndOffset() {
return Optional.ofNullable(logEndOffset);
}
+
+ public Optional<Long> getMaxTimestamp() {
+ return Optional.ofNullable(maxTimestamp);
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 1470081a8..b4c8f1c9a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -125,6 +125,7 @@ public final class LogTablet {
// note: currently, for primary key table, the log start offset nerve be
updated
private volatile long lakeLogStartOffset = Long.MAX_VALUE;
private volatile long lakeLogEndOffset = -1L;
+ private volatile long lakeMaxTimestamp = -1;
private LogTablet(
PhysicalTablePath physicalPath,
@@ -250,6 +251,10 @@ public final class LogTablet {
return lakeLogEndOffset;
}
+ public long getLakeMaxTimestamp() {
+ return lakeMaxTimestamp;
+ }
+
public int getWriterIdCount() {
return writerStateManager.writerIdCount();
}
@@ -526,6 +531,12 @@ public final class LogTablet {
}
}
+ public void updateLakeMaxTimestamp(long lakeMaxTimestamp) {
+ if (lakeMaxTimestamp > this.lakeMaxTimestamp) {
+ this.lakeMaxTimestamp = lakeMaxTimestamp;
+ }
+ }
+
public void loadWriterSnapshot(long lastOffset) throws IOException {
synchronized (lock) {
rebuildWriterState(lastOffset, writerStateManager);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 876e8e984..f11fe1399 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -759,6 +759,10 @@ public class ReplicaManager {
.getLogEndOffset()
.ifPresent(logTablet::updateLakeLogEndOffset);
+ lakeBucketOffset
+ .getMaxTimestamp()
+ .ifPresent(logTablet::updateLakeMaxTimestamp);
+
responseCallback.accept(new
NotifyLakeTableOffsetResponse());
}
});
@@ -818,6 +822,10 @@ public class ReplicaManager {
lakeTableSnapshot
.getLogEndOffset(tb)
.ifPresent(replica.getLogTablet()::updateLakeLogEndOffset);
+
+ lakeTableSnapshot
+ .getMaxTimestamp(tb)
+ .ifPresent(replica.getLogTablet()::updateLakeMaxTimestamp);
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index f77cf378e..823c2bb52 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -1428,6 +1428,7 @@ public class ServerRpcMessageUtils {
long snapshotId = pdLakeTableSnapshotInfo.getSnapshotId();
Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
Map<Long, String> partitionNameByPartitionId = new HashMap<>();
for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
@@ -1447,8 +1448,13 @@ public class ServerRpcMessageUtils {
lakeTableOffsetForBucket.hasLogEndOffset()
? lakeTableOffsetForBucket.getLogEndOffset()
: null;
+ Long logMaxTimestamp =
+ lakeTableOffsetForBucket.hasMaxTimestamp()
+ ? lakeTableOffsetForBucket.getMaxTimestamp()
+ : null;
bucketLogStartOffset.put(tableBucket, logStartOffset);
bucketLogEndOffset.put(tableBucket, logEndOffset);
+ bucketMaxTimestamp.put(tableBucket, logMaxTimestamp);
if (lakeTableOffsetForBucket.hasPartitionName()) {
partitionNameByPartitionId.put(
@@ -1462,6 +1468,7 @@ public class ServerRpcMessageUtils {
tableId,
bucketLogStartOffset,
bucketLogEndOffset,
+ bucketMaxTimestamp,
partitionNameByPartitionId));
}
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId);
@@ -1483,6 +1490,8 @@ public class ServerRpcMessageUtils {
lakeTableSnapshot.getLogEndOffset(tableBucket).ifPresent(reqForBucket::setLogEndOffset);
+
lakeTableSnapshot.getMaxTimestamp(tableBucket).ifPresent(reqForBucket::setMaxTimestamp);
+
return reqForBucket;
}
@@ -1502,8 +1511,11 @@ public class ServerRpcMessageUtils {
reqForBucket.hasLogStartOffset() ?
reqForBucket.getLogStartOffset() : null;
Long logEndOffset =
reqForBucket.hasLogEndOffset() ?
reqForBucket.getLogEndOffset() : null;
+ Long maxTimestamp =
+ reqForBucket.hasMaxTimestamp() ?
reqForBucket.getMaxTimestamp() : null;
lakeBucketOffsetMap.put(
- tableBucket, new LakeBucketOffset(snapshotId,
logStartOffset, logEndOffset));
+ tableBucket,
+ new LakeBucketOffset(snapshotId, logStartOffset,
logEndOffset, maxTimestamp));
}
return new NotifyLakeTableOffsetData(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index d555cd1df..98654972a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -1048,6 +1048,11 @@ public class ZooKeeperClient implements AutoCloseable {
new HashMap<>(previous.getBucketLogEndOffset());
bucketLogEndOffset.putAll(lakeTableSnapshot.getBucketLogEndOffset());
+ // merge max timestamp, current will override the previous
+ Map<TableBucket, Long> bucketMaxTimestamp =
+ new HashMap<>(previous.getBucketMaxTimestamp());
+
bucketMaxTimestamp.putAll(lakeTableSnapshot.getBucketMaxTimestamp());
+
Map<Long, String> partitionNameById =
new HashMap<>(previous.getPartitionNameIdByPartitionId());
partitionNameById.putAll(lakeTableSnapshot.getPartitionNameIdByPartitionId());
@@ -1058,6 +1063,7 @@ public class ZooKeeperClient implements AutoCloseable {
lakeTableSnapshot.getTableId(),
bucketLogStartOffset,
bucketLogEndOffset,
+ bucketMaxTimestamp,
partitionNameById);
zkClient.setData().forPath(path,
LakeTableZNode.encode(lakeTableSnapshot));
} else {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
index 4abccc1b3..9d65e3bf0 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshot.java
@@ -32,10 +32,11 @@ public class LakeTableSnapshot {
// the log offset of the bucket
- // mapping from bucket id to log start/end offset,
+ // mapping from bucket id to log start/end offset or max timestamp,
// will be null if log offset is unknown such as reading the snapshot of
primary key table
private final Map<TableBucket, Long> bucketLogStartOffset;
private final Map<TableBucket, Long> bucketLogEndOffset;
+ private final Map<TableBucket, Long> bucketMaxTimestamp;
// mapping from partition id to partition name, will be empty if the table
is not partitioned
// table
@@ -46,11 +47,13 @@ public class LakeTableSnapshot {
long tableId,
Map<TableBucket, Long> bucketLogStartOffset,
Map<TableBucket, Long> bucketLogEndOffset,
+ Map<TableBucket, Long> bucketMaxTimestamp,
Map<Long, String> partitionNameIdByPartitionId) {
this.snapshotId = snapshotId;
this.tableId = tableId;
this.bucketLogStartOffset = bucketLogStartOffset;
this.bucketLogEndOffset = bucketLogEndOffset;
+ this.bucketMaxTimestamp = bucketMaxTimestamp;
this.partitionNameIdByPartitionId = partitionNameIdByPartitionId;
}
@@ -78,6 +81,10 @@ public class LakeTableSnapshot {
return Optional.ofNullable(bucketLogEndOffset.get(tableBucket));
}
+ public Optional<Long> getMaxTimestamp(TableBucket tableBucket) {
+ return Optional.ofNullable(bucketMaxTimestamp.get(tableBucket));
+ }
+
public Map<TableBucket, Long> getBucketLogEndOffset() {
return bucketLogEndOffset;
}
@@ -86,6 +93,10 @@ public class LakeTableSnapshot {
return bucketLogStartOffset;
}
+ public Map<TableBucket, Long> getBucketMaxTimestamp() {
+ return bucketMaxTimestamp;
+ }
+
public Map<Long, String> getPartitionNameIdByPartitionId() {
return partitionNameIdByPartitionId;
}
@@ -100,6 +111,7 @@ public class LakeTableSnapshot {
&& tableId == that.tableId
&& Objects.equals(bucketLogStartOffset,
that.bucketLogStartOffset)
&& Objects.equals(bucketLogEndOffset, that.bucketLogEndOffset)
+ && Objects.equals(bucketMaxTimestamp, that.bucketMaxTimestamp)
&& Objects.equals(partitionNameIdByPartitionId,
that.partitionNameIdByPartitionId);
}
@@ -110,6 +122,7 @@ public class LakeTableSnapshot {
tableId,
bucketLogStartOffset,
bucketLogEndOffset,
+ bucketMaxTimestamp,
partitionNameIdByPartitionId);
}
@@ -124,6 +137,8 @@ public class LakeTableSnapshot {
+ bucketLogStartOffset
+ ", bucketLogEndOffset="
+ bucketLogEndOffset
+ + ", bucketMaxTimestamp="
+ + bucketMaxTimestamp
+ ", partitionNameIdByPartitionId="
+ partitionNameIdByPartitionId
+ '}';
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
index 111332b25..539da5e02 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
@@ -43,6 +43,7 @@ public class LakeTableSnapshotJsonSerde
private static final String BUCKET_ID = "bucket_id";
private static final String LOG_START_OFFSET = "log_start_offset";
private static final String LOG_END_OFFSET = "log_end_offset";
+ private static final String MAX_TIMESTAMP = "max_timestamp";
private static final String PARTITION_NAME = "partition_name";
private static final int VERSION = 1;
@@ -82,6 +83,11 @@ public class LakeTableSnapshotJsonSerde
LOG_END_OFFSET,
lakeTableSnapshot.getLogEndOffset(tableBucket).get());
}
+ if (lakeTableSnapshot.getMaxTimestamp(tableBucket).isPresent()) {
+ generator.writeNumberField(
+ MAX_TIMESTAMP,
lakeTableSnapshot.getMaxTimestamp(tableBucket).get());
+ }
+
generator.writeEndObject();
}
generator.writeEndArray();
@@ -100,6 +106,7 @@ public class LakeTableSnapshotJsonSerde
Iterator<JsonNode> buckets = node.get(BUCKETS).elements();
Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
while (buckets.hasNext()) {
JsonNode bucket = buckets.next();
@@ -120,6 +127,12 @@ public class LakeTableSnapshotJsonSerde
bucketLogEndOffset.put(tableBucket, null);
}
+ if (bucket.get(MAX_TIMESTAMP) != null) {
+ bucketMaxTimestamp.put(tableBucket,
bucket.get(MAX_TIMESTAMP).asLong());
+ } else {
+ bucketMaxTimestamp.put(tableBucket, null);
+ }
+
if (partitionId != null && bucket.get(PARTITION_NAME) != null) {
partitionNameIdByPartitionId.put(
tableBucket.getPartitionId(),
bucket.get(PARTITION_NAME).asText());
@@ -130,6 +143,7 @@ public class LakeTableSnapshotJsonSerde
tableId,
bucketLogStartOffset,
bucketLogEndOffset,
+ bucketMaxTimestamp,
partitionNameIdByPartitionId);
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
index faa4f4a6c..88f3008fa 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java
@@ -104,21 +104,25 @@ class CommitLakeTableSnapshotITCase {
long snapshotId = 1;
long dataLakeLogStartOffset = 0;
long dataLakeLogEndOffset = 50;
+ long dataLakeMaxTimestamp = System.currentTimeMillis();
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
genCommitLakeTableSnapshotRequest(
tableId,
BUCKET_NUM,
snapshotId,
dataLakeLogStartOffset,
- dataLakeLogEndOffset);
+ dataLakeLogEndOffset,
+ dataLakeMaxTimestamp);
coordinatorGateway.commitLakeTableSnapshot(commitLakeTableSnapshotRequest).get();
Map<TableBucket, Long> bucketsLogStartOffset = new HashMap<>();
Map<TableBucket, Long> bucketsLogEndOffset = new HashMap<>();
+ Map<TableBucket, Long> bucketsMaxTimestamp = new HashMap<>();
for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
TableBucket tb = new TableBucket(tableId, bucket);
bucketsLogStartOffset.put(tb, dataLakeLogStartOffset);
bucketsLogEndOffset.put(tb, dataLakeLogEndOffset);
+ bucketsMaxTimestamp.put(tb, dataLakeMaxTimestamp);
Replica replica =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
retry(
Duration.ofMinutes(2),
@@ -127,6 +131,7 @@ class CommitLakeTableSnapshotITCase {
assertThat(logTablet.getLakeLogStartOffset())
.isEqualTo(dataLakeLogStartOffset);
assertThat(logTablet.getLakeLogEndOffset()).isEqualTo(dataLakeLogEndOffset);
+
assertThat(logTablet.getLakeMaxTimestamp()).isEqualTo(dataLakeMaxTimestamp);
});
}
@@ -136,6 +141,7 @@ class CommitLakeTableSnapshotITCase {
tableId,
bucketsLogStartOffset,
bucketsLogEndOffset,
+ bucketsMaxTimestamp,
Collections.emptyMap());
checkLakeTableDataInZk(tableId, expectedDataLakeTieredInfo);
}
@@ -146,7 +152,12 @@ class CommitLakeTableSnapshotITCase {
}
private static CommitLakeTableSnapshotRequest
genCommitLakeTableSnapshotRequest(
- long tableId, int buckets, long snapshotId, long logStartOffset,
long logEndOffset) {
+ long tableId,
+ int buckets,
+ long snapshotId,
+ long logStartOffset,
+ long logEndOffset,
+ long maxTimestamp) {
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
new CommitLakeTableSnapshotRequest();
PbLakeTableSnapshotInfo reqForTable =
commitLakeTableSnapshotRequest.addTablesReq();
@@ -161,6 +172,7 @@ class CommitLakeTableSnapshotITCase {
lakeTableOffsetForBucket.setBucketId(tb.getBucket());
lakeTableOffsetForBucket.setLogStartOffset(logStartOffset);
lakeTableOffsetForBucket.setLogEndOffset(logEndOffset);
+ lakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
}
return commitLakeTableSnapshotRequest;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
index 57270c5e2..0d0b25204 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java
@@ -43,30 +43,37 @@ class NotifyReplicaLakeTableOffsetTest extends
ReplicaTestBase {
Replica replica = replicaManager.getReplicaOrException(tb);
// now, notify lake table offset
- notifyAndVerify(tb, replica, 1, 0L, 20L);
+ notifyAndVerify(tb, replica, 1, 0L, 20L, System.currentTimeMillis());
// notify again
- notifyAndVerify(tb, replica, 2, 20L, 30L);
+ notifyAndVerify(tb, replica, 2, 20L, 30L, System.currentTimeMillis());
}
private void notifyAndVerify(
- TableBucket tb, Replica replica, long snapshotId, long
startOffset, long endOffset)
+ TableBucket tb,
+ Replica replica,
+ long snapshotId,
+ long startOffset,
+ long endOffset,
+ long maxTimestamp)
throws Exception {
NotifyLakeTableOffsetData notifyLakeTableOffsetData =
- getNotifyLakeTableOffset(tb, snapshotId, startOffset,
endOffset);
+ getNotifyLakeTableOffset(tb, snapshotId, startOffset,
endOffset, maxTimestamp);
CompletableFuture<NotifyLakeTableOffsetResponse> future = new
CompletableFuture<>();
replicaManager.notifyLakeTableOffset(notifyLakeTableOffsetData,
future::complete);
future.get();
- verifyLakeTableOffset(replica, snapshotId, startOffset, endOffset);
+ verifyLakeTableOffset(replica, snapshotId, startOffset, endOffset,
maxTimestamp);
}
private void verifyLakeTableOffset(
- Replica replica, long snapshotId, long startOffset, long
endOffset) {
+ Replica replica, long snapshotId, long startOffset, long
endOffset, long maxTimestamp) {
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeTableSnapshotId())
.isEqualTo(snapshotId);
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogStartOffset())
.isEqualTo(startOffset);
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogEndOffset())
.isEqualTo(endOffset);
+
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeMaxTimestamp())
+ .isEqualTo(maxTimestamp);
}
private TableBucket makeTableBucket(boolean partitionTable) {
@@ -82,10 +89,15 @@ class NotifyReplicaLakeTableOffsetTest extends
ReplicaTestBase {
}
private NotifyLakeTableOffsetData getNotifyLakeTableOffset(
- TableBucket tableBucket, long snapshotId, long startOffset, long
endOffset) {
+ TableBucket tableBucket,
+ long snapshotId,
+ long startOffset,
+ long endOffset,
+ long maxTimestamp) {
return new NotifyLakeTableOffsetData(
1,
Collections.singletonMap(
- tableBucket, new LakeBucketOffset(snapshotId,
startOffset, endOffset)));
+ tableBucket,
+ new LakeBucketOffset(snapshotId, startOffset,
endOffset, maxTimestamp)));
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
index 306b0320e..282c00d4b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
@@ -39,6 +39,7 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
1L,
Collections.emptyMap(),
Collections.emptyMap(),
+ Collections.emptyMap(),
Collections.emptyMap());
long tableId = 4;
@@ -48,6 +49,9 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
bucketLogEndOffset.put(new TableBucket(tableId, 1), 3L);
bucketLogEndOffset.put(new TableBucket(tableId, 2), 4L);
+ Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+ bucketMaxTimestamp.put(new TableBucket(tableId, 1), 5L);
+ bucketMaxTimestamp.put(new TableBucket(tableId, 2), 6L);
LakeTableSnapshot lakeTableSnapshot2 =
new LakeTableSnapshot(
@@ -55,6 +59,7 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
tableId,
bucketLogStartOffset,
bucketLogEndOffset,
+ bucketMaxTimestamp,
Collections.emptyMap());
tableId = 5;
@@ -69,12 +74,17 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
bucketLogEndOffset.put(new TableBucket(tableId, 1L, 1), 3L);
bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 4L);
+ bucketMaxTimestamp = new HashMap<>();
+ bucketMaxTimestamp.put(new TableBucket(tableId, 1L, 1), 5L);
+ bucketMaxTimestamp.put(new TableBucket(tableId, 2L, 1), 6L);
+
LakeTableSnapshot lakeTableSnapshot3 =
new LakeTableSnapshot(
3,
tableId,
bucketLogStartOffset,
bucketLogEndOffset,
+ bucketMaxTimestamp,
partitionNameIdByPartitionId);
return new LakeTableSnapshot[] {
@@ -87,11 +97,11 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
return new String[] {
"{\"version\":1,\"snapshot_id\":1,\"table_id\":1,\"buckets\":[]}",
"{\"version\":1,\"snapshot_id\":2,\"table_id\":4,"
- +
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4},"
- +
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3}]}",
+ +
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4,\"max_timestamp\":6},"
+ +
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3,\"max_timestamp\":5}]}",
"{\"version\":1,\"snapshot_id\":3,\"table_id\":5,"
- +
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3},"
- +
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4}]}"
+ +
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3,\"max_timestamp\":5},"
+ +
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4,\"max_timestamp\":6}]}"
};
}
}