This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ad6737796f6 branch-2.1: [fix](binlog) Fix table not gc binlog
meta/records #46981 (#47257)
ad6737796f6 is described below
commit ad6737796f6c604de5f17eb5c6c7dde35c06bb76
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 24 17:32:58 2025 +0800
branch-2.1: [fix](binlog) Fix table not gc binlog meta/records #46981
(#47257)
Cherry-picked from #46981
Co-authored-by: Uniqueyou <[email protected]>
---
.../org/apache/doris/binlog/BinlogManager.java | 2 +
.../java/org/apache/doris/binlog/DBBinlog.java | 13 +--
.../java/org/apache/doris/binlog/TableBinlog.java | 26 +++--
.../java/org/apache/doris/binlog/DbBinlogTest.java | 118 +++++++++++++++++++++
.../apache/doris/binlog/MockBinlogConfigCache.java | 5 +
.../org/apache/doris/binlog/TableBinlogTest.java | 117 ++++++++++++++++++++
6 files changed, 263 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 1128aa12cb1..3ec914abe63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -558,9 +558,11 @@ public class BinlogManager {
tombstones.add(dbTombstones);
}
}
+
return tombstones;
}
+
public void replayGc(BinlogGcInfo binlogGcInfo) {
lock.writeLock().lock();
Map<Long, DBBinlog> gcDbBinlogMap;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 0816564f150..9ffc20412fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -287,17 +287,11 @@ public class DBBinlog {
if (dbBinlogConfig == null) {
LOG.error("db not found. dbId: {}", dbId);
return null;
- }
-
- BinlogTombstone tombstone;
- if (dbBinlogConfig.isEnable()) {
- // db binlog is enabled, only one binlogTombstones
- tombstone = dbBinlogEnableGc(dbBinlogConfig);
+ } else if (!dbBinlogConfig.isEnable()) {
+ return dbBinlogDisableGc();
} else {
- tombstone = dbBinlogDisableGc();
+ return dbBinlogEnableGc(dbBinlogConfig);
}
-
- return tombstone;
}
private BinlogTombstone collectTableTombstone(List<BinlogTombstone>
tableTombstones, boolean isDbGc) {
@@ -341,6 +335,7 @@ public class DBBinlog {
tombstones.add(tombstone);
}
}
+
BinlogTombstone tombstone = collectTableTombstone(tombstones, false);
if (tombstone != null) {
removeExpiredMetaData(tombstone.getCommitSeq());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index f3279b328c9..162adc2603b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -197,8 +197,11 @@ public class TableBinlog {
public BinlogTombstone gc() {
// step 1: get expire time
BinlogConfig tableBinlogConfig =
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
+ Boolean isCleanFullBinlog = false;
if (tableBinlogConfig == null) {
return null;
+ } else if (!tableBinlogConfig.isEnable()) {
+ isCleanFullBinlog = true;
}
long ttlSeconds = tableBinlogConfig.getTtlSeconds();
@@ -208,22 +211,27 @@ public class TableBinlog {
LOG.info(
"gc table binlog. dbId: {}, tableId: {}, expiredMs: {},
ttlSecond: {}, maxBytes: {}, "
- + "maxHistoryNums: {}, now: {}",
- dbId, tableId, expiredMs, ttlSeconds, maxBytes,
maxHistoryNums, System.currentTimeMillis());
+ + "maxHistoryNums: {}, now: {}, isCleanFullBinlog: {}",
+ dbId, tableId, expiredMs, ttlSeconds, maxBytes,
maxHistoryNums, System.currentTimeMillis(),
+ isCleanFullBinlog);
// step 2: get tombstoneUpsertBinlog and dummyBinlog
Pair<TBinlog, Long> tombstoneInfo;
lock.writeLock().lock();
try {
- // find the last expired commit seq.
long expiredCommitSeq = -1;
- Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
- while (timeIterator.hasNext()) {
- Pair<Long, Long> entry = timeIterator.next();
- if (expiredMs < entry.second) {
- break;
+ if (isCleanFullBinlog) {
+ expiredCommitSeq = binlogs.last().getCommitSeq();
+ } else {
+ // find the last expired commit seq.
+ Iterator<Pair<Long, Long>> timeIterator =
timestamps.iterator();
+ while (timeIterator.hasNext()) {
+ Pair<Long, Long> entry = timeIterator.next();
+ if (expiredMs < entry.second) {
+ break;
+ }
+ expiredCommitSeq = entry.first;
}
- expiredCommitSeq = entry.first;
}
final long lastExpiredCommitSeq = expiredCommitSeq;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
index 06230bfce56..939ae49f232 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
@@ -304,4 +304,122 @@ public class DbBinlogTest {
}
}
}
+
+ @Test
+ public void testDbAndTableGcWithDisable() {
+ // init base data
+ long expiredTime = baseNum + expiredBinlogNum;
+ Map<String, Long> ttlMap = Maps.newHashMap();
+ for (int i = 0; i < tableNum; ++i) {
+ String key = String.format("%d_%d", dbId, baseTableId + i);
+ ttlMap.put(key, expiredTime);
+ }
+ MockBinlogConfigCache binlogConfigCache =
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+ // disable db binlog
+ binlogConfigCache.addDbBinlogConfig(dbId, false, 0L);
+ // disable some table binlog
+ for (int i = 0; i <= gcTableNum; i++) {
+ binlogConfigCache.addTableBinlogConfig(dbId, baseTableId + i,
false, expiredTime);
+ }
+
+ // init & add binlogs
+ List<TBinlog> testBinlogs = Lists.newArrayList();
+ Long[] tableLastCommitInfo = new Long[tableNum];
+ long maxGcTableId = baseTableId + gcTableNum;
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ long tableId = baseTableId + (i / tableNum);
+ long commitSeq = baseNum + i;
+ tableLastCommitInfo[i / tableNum] = commitSeq;
+ TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId,
commitSeq, baseNum);
+ testBinlogs.add(binlog);
+ }
+
+ // init DbBinlog
+ DBBinlog dbBinlog = null;
+
+ // insert binlogs
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ if (dbBinlog == null) {
+ dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i));
+ }
+ dbBinlog.addBinlog(testBinlogs.get(i), null);
+ }
+
+ // trigger gc
+ BinlogTombstone tombstone = dbBinlog.gc();
+
+ // check binlog status - all binlogs should be cleared for disabled
tables
+ for (TBinlog binlog : testBinlogs) {
+ long tableId = binlog.getTableIds().get(0);
+ if (tableId <= maxGcTableId) {
+ // For disabled tables, all binlogs should be cleared
+ Assert.assertEquals(0, binlog.getTableRef());
+ } else {
+ // For enabled tables, only expired binlogs should be cleared
+ if (binlog.getTimestamp() <= expiredTime) {
+ Assert.assertEquals(0, binlog.getTableRef());
+ } else {
+ Assert.assertEquals(1, binlog.getTableRef());
+ }
+ }
+ }
+
+ // check tombstone
+ Assert.assertFalse(tombstone.isDbBinlogTomstone());
+ Assert.assertEquals(baseNum + totalBinlogNum - 1,
tombstone.getCommitSeq());
+ }
+
+ @Test
+ public void testDbAndTableGcWithEnable() {
+ // init base data
+ long expiredTime = baseNum + expiredBinlogNum;
+ Map<String, Long> ttlMap = Maps.newHashMap();
+ for (int i = 0; i < tableNum; ++i) {
+ String key = String.format("%d_%d", dbId, baseTableId + i);
+ ttlMap.put(key, expiredTime);
+ }
+ MockBinlogConfigCache binlogConfigCache =
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+ // enable db binlog
+ binlogConfigCache.addDbBinlogConfig(dbId, true, expiredTime);
+ // enable all table binlog
+ for (int i = 0; i < tableNum; i++) {
+ binlogConfigCache.addTableBinlogConfig(dbId, baseTableId + i,
true, expiredTime);
+ }
+
+ // init & add binlogs
+ List<TBinlog> testBinlogs = Lists.newArrayList();
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ long tableId = baseTableId + (i / tableNum);
+ long commitSeq = baseNum + i;
+ TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId,
commitSeq, baseNum + i);
+ testBinlogs.add(binlog);
+ }
+
+ // init DbBinlog
+ DBBinlog dbBinlog = null;
+
+ // insert binlogs
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ if (dbBinlog == null) {
+ dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i));
+ }
+ dbBinlog.addBinlog(testBinlogs.get(i), null);
+ }
+
+ // trigger gc
+ BinlogTombstone tombstone = dbBinlog.gc();
+
+ // check binlog status - only expired binlogs should be cleared
+ for (TBinlog binlog : testBinlogs) {
+ if (binlog.getTimestamp() <= expiredTime) {
+ Assert.assertEquals(0, binlog.getTableRef());
+ } else {
+ Assert.assertEquals(1, binlog.getTableRef());
+ }
+ }
+
+ // check tombstone
+ Assert.assertTrue(tombstone.isDbBinlogTomstone());
+ Assert.assertEquals(expiredTime, tombstone.getCommitSeq());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
index 4622171e930..d2720bf61d5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
@@ -34,6 +34,11 @@ final class MockBinlogConfigCache extends BinlogConfigCache {
mockedConfigs.put(String.valueOf(dbId), config);
}
+ public void addTableBinlogConfig(long dbId, long tableId, boolean
enableBinlog, long expiredTime) {
+ BinlogConfig config =
BinlogTestUtils.newTestBinlogConfig(enableBinlog, expiredTime);
+ mockedConfigs.put(String.format("%d_%d", dbId, tableId), config);
+ }
+
@Override
public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
return mockedConfigs.get(String.format("%d_%d", dbId, tableId));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
index cd86c5935e1..e55a1f252f0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.binlog;
import org.apache.doris.thrift.TBinlog;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import mockit.Mock;
import mockit.MockUp;
import org.junit.Assert;
@@ -27,6 +28,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
+import java.util.Map;
public class TableBinlogTest {
private long dbId = 10000;
@@ -139,4 +141,119 @@ public class TableBinlogTest {
TBinlog dummy = tableBinlog.getDummyBinlog();
Assert.assertEquals(expiredCommitSeq, dummy.getCommitSeq());
}
+
+ @Test
+ public void testTableGcBinlogWithDisable() {
+ // mock BinlogUtils
+ new MockUp<BinlogUtils>() {
+ @Mock
+ public long getExpiredMs(long direct) {
+ return direct;
+ }
+ };
+ Map<String, Long> ttlMap = Maps.newHashMap();
+
+ // init base data
+ long expiredTime = baseNum + expiredBinlogNum;
+ ttlMap.put(String.format("%d_%d", dbId, tableId), expiredTime);
+
+ MockBinlogConfigCache binlogConfigCache =
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+
+ // disable table binlog
+ binlogConfigCache.addTableBinlogConfig(dbId, tableId, false,
expiredTime);
+
+ // init & add binlogs
+ List<TBinlog> testBinlogs = Lists.newArrayList();
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum
+ i, baseNum + i);
+ testBinlogs.add(binlog);
+ }
+
+ // init TableBinlog
+ TableBinlog tableBinlog = null;
+
+ // insert binlogs
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ if (tableBinlog == null) {
+ tableBinlog = new TableBinlog(binlogConfigCache,
testBinlogs.get(i), dbId, tableId);
+ }
+ tableBinlog.addBinlog(testBinlogs.get(i));
+ }
+
+ // trigger gc
+ BinlogTombstone tombstone = tableBinlog.gc();
+
+ // check binlog status - all binlogs should be cleared when table
binlog is disabled
+ for (TBinlog binlog : testBinlogs) {
+ Assert.assertEquals(0, binlog.getTableRef());
+ }
+
+ // check tombstone
+ Assert.assertFalse(tombstone.isDbBinlogTomstone());
+ Assert.assertEquals(baseNum + totalBinlogNum - 1,
tombstone.getCommitSeq());
+
+ // check dummy - should have the last commitSeq
+ TBinlog dummy = tableBinlog.getDummyBinlog();
+ Assert.assertEquals(baseNum + totalBinlogNum - 1,
dummy.getCommitSeq());
+ }
+
+ @Test
+ public void testTableGcBinlogWithEnable() {
+ // mock BinlogUtils
+ new MockUp<BinlogUtils>() {
+ @Mock
+ public long getExpiredMs(long direct) {
+ return direct;
+ }
+ };
+ Map<String, Long> ttlMap = Maps.newHashMap();
+
+ // init base data
+ long expiredTime = baseNum + expiredBinlogNum;
+ ttlMap.put(String.format("%d_%d", dbId, tableId), expiredTime);
+
+ MockBinlogConfigCache binlogConfigCache =
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+
+ // enable table binlog
+ binlogConfigCache.addTableBinlogConfig(dbId, tableId, true,
expiredTime);
+
+ // init & add binlogs
+ List<TBinlog> testBinlogs = Lists.newArrayList();
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum
+ i, baseNum + i);
+ testBinlogs.add(binlog);
+ }
+
+ // init TableBinlog
+ TableBinlog tableBinlog = null;
+
+ // insert binlogs
+ for (int i = 0; i < totalBinlogNum; ++i) {
+ if (tableBinlog == null) {
+ tableBinlog = new TableBinlog(binlogConfigCache,
testBinlogs.get(i), dbId, tableId);
+ }
+ tableBinlog.addBinlog(testBinlogs.get(i));
+ }
+
+ // trigger gc
+ BinlogTombstone tombstone = tableBinlog.gc();
+
+ // check binlog status - only expired binlogs should be cleared
+ for (TBinlog binlog : testBinlogs) {
+ if (binlog.getTimestamp() <= expiredTime) {
+ Assert.assertEquals(0, binlog.getTableRef());
+ } else {
+ Assert.assertEquals(1, binlog.getTableRef());
+ }
+ }
+
+ // check tombstone
+ Assert.assertFalse(tombstone.isDbBinlogTomstone());
+ Assert.assertEquals(expiredTime, tombstone.getCommitSeq());
+
+ // check dummy - should have the expiredTime as commitSeq
+ TBinlog dummy = tableBinlog.getDummyBinlog();
+ Assert.assertEquals(expiredTime, dummy.getCommitSeq());
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]