This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9e3f1ac739c branch-3.0: [improvement](ccr) Add and adjust result for
get_lag #48953 (#49054)
9e3f1ac739c is described below
commit 9e3f1ac739cad4d4653bb43f2df9cae46b7c7bdf
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 17 15:46:53 2025 +0800
branch-3.0: [improvement](ccr) Add and adjust result for get_lag #48953
(#49054)
Cherry-picked from #48953
Co-authored-by: Uniqueyou <[email protected]>
---
.../org/apache/doris/binlog/BinlogLagInfo.java | 17 +++++++++++++-
.../java/org/apache/doris/binlog/BinlogUtils.java | 27 +++++++++++-----------
.../apache/doris/service/FrontendServiceImpl.java | 2 ++
gensrc/thrift/FrontendService.thrift | 2 ++
4 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
index 83b4181fa2f..4328958d879 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
@@ -23,13 +23,18 @@ public class BinlogLagInfo {
private long lastCommitSeq;
private long firstCommitTs;
private long lastCommitTs;
+ private long nextCommitSeq;
+ private long nextCommitTs;
- public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq,
long firstCommitTs, long lastCommitTs) {
+ public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq,
long firstCommitTs, long lastCommitTs,
+ long nextCommitSeq, long nextCommitTs) {
this.lag = lag;
this.firstCommitSeq = firstCommitSeq;
this.lastCommitSeq = lastCommitSeq;
this.firstCommitTs = firstCommitTs;
this.lastCommitTs = lastCommitTs;
+ this.nextCommitSeq = nextCommitSeq;
+ this.nextCommitTs = nextCommitTs;
}
public BinlogLagInfo() {
@@ -38,6 +43,16 @@ public class BinlogLagInfo {
lastCommitSeq = 0;
firstCommitTs = 0;
lastCommitTs = 0;
+ nextCommitSeq = 0;
+ nextCommitTs = 0;
+ }
+
+ public long getNextCommitSeq() {
+ return nextCommitSeq;
+ }
+
+ public long getNextCommitTs() {
+ return nextCommitTs;
}
public long getLag() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 66350cec0d3..0347b94c530 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -68,30 +68,31 @@ public class BinlogUtils {
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
BinlogLagInfo lagInfo = new BinlogLagInfo(binlogs.size(),
firstBinlog.getCommitSeq(),
- lastBinlog.getCommitSeq(), firstBinlog.getTimestamp(),
lastBinlog.getTimestamp());
+ lastBinlog.getCommitSeq(), firstBinlog.getTimestamp(),
lastBinlog.getTimestamp(),
+ firstBinlog.getCommitSeq(), firstBinlog.getTimestamp());
return Pair.of(status, lagInfo);
}
// find first binlog whose commitSeq > commitSeq
TBinlog guard = new TBinlog();
guard.setCommitSeq(prevCommitSeq);
- TBinlog binlog = binlogs.higher(guard);
+ TBinlog nextBinlog = binlogs.higher(guard);
// all prevCommitSeq <= commitSeq
+ long lastCommitSeq = lastBinlog.getCommitSeq();
+ long lastCommitTs = lastBinlog.getTimestamp();
+ long firstCommitSeq = firstBinlog.getCommitSeq();
+ long firstCommitTs = firstBinlog.getTimestamp();
long lag = 0;
- long lastCommitSeq = 0;
- long lastCommitTs = 0;
- long firstCommitSeq = 0;
- long firstCommitTs = 0;
- if (binlog != null) {
- lag = binlogs.tailSet(binlog).size();
- firstCommitSeq = binlog.getCommitSeq();
- firstCommitTs = binlog.getTimestamp();
- lastCommitSeq = lastBinlog.getCommitSeq();
- lastCommitTs = lastBinlog.getTimestamp();
+ long nextCommitSeq = 0;
+ long nextCommitTs = 0;
+ if (nextBinlog != null) {
+ lag = binlogs.tailSet(nextBinlog).size();
+ nextCommitSeq = nextBinlog.getCommitSeq();
+ nextCommitTs = nextBinlog.getTimestamp();
}
return Pair.of(status, new BinlogLagInfo(lag, firstCommitSeq,
lastCommitSeq,
- firstCommitTs, lastCommitTs));
+ firstCommitTs, lastCommitTs, nextCommitSeq, nextCommitTs));
}
public static TBinlog newDummyBinlog(long dbId, long tableId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 075d664893a..38d81d64bfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3389,6 +3389,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setLastCommitSeq(lagInfo.getLastCommitSeq());
result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs());
result.setLastBinlogTimestamp(lagInfo.getLastCommitTs());
+ result.setNextCommitSeq(lagInfo.getNextCommitSeq());
+ result.setNextBinlogTimestamp(lagInfo.getNextCommitTs());
}
return result;
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 48929ef721b..556668c8ef8 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1497,6 +1497,8 @@ struct TGetBinlogLagResult {
5: optional i64 last_commit_seq
6: optional i64 first_binlog_timestamp
7: optional i64 last_binlog_timestamp
+ 8: optional i64 next_commit_seq
+ 9: optional i64 next_binlog_timestamp
}
struct TUpdateFollowerStatsCacheRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]