This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch filter_dropped_tables in repository https://gitbox.apache.org/repos/asf/doris.git
commit ff89966af5a17bc56e150b9d203a2fed5ce7f137 Author: w41ter <[email protected]> AuthorDate: Wed Jul 17 11:54:59 2024 +0000 [chore](fe) Returns dropped tables in GetMeta request The CCR syncer needs to know the distribution of tables, partitions, indexes, and replicas when synchronizing binlogs. If a table is deleted before the binlog synchronization is complete, the CCR syncer cannot continue synchronizing. This PR will record the deleted tables and include them in the get meta response, allowing the CCR syncer to filter out the binlogs that belong to these tables. The CCR syncer part PR is selectdb/ccr-syncer#123. --- be/src/runtime/snapshot_loader.cpp | 1 - .../org/apache/doris/binlog/BinlogManager.java | 14 ++++++ .../java/org/apache/doris/binlog/DBBinlog.java | 50 +++++++++++++++++----- .../org/apache/doris/binlog/DropTableRecord.java | 4 ++ .../main/java/org/apache/doris/catalog/Env.java | 1 + gensrc/thrift/FrontendService.thrift | 1 + 6 files changed, 60 insertions(+), 11 deletions(-) diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index b840636a46f..f34dfde229a 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -458,7 +458,6 @@ Status SnapshotLoader::remote_http_download( } // Step 3: Validate remote tablet snapshot paths && remote files map - // TODO(Drogon): Add md5sum check // key is remote snapshot paths, value is filelist // get all these use http download action // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 47d39dd4dc1..6fc2e3f813f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -380,6 +380,20 @@ public class BinlogManager { } } + // get the dropped tables of the db. + public List<Long> getDroppedTables(long dbId) { + lock.readLock().lock(); + try { + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { + return Lists.newArrayList(); + } + return dbBinlog.getDroppedTables(); + } finally { + lock.readLock().unlock(); + } + } + public List<BinlogTombstone> gc() { LOG.info("begin gc binlog"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index b43805b06d5..502491004e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -62,6 +62,8 @@ public class DBBinlog { // The commit seq of the dropped partitions private List<Pair<Long, Long>> droppedPartitions; + // The commit seq of the dropped tables + private List<Pair<Long, Long>> droppedTables; private List<TBinlog> tableDummyBinlogs; @@ -79,6 +81,7 @@ public class DBBinlog { tableBinlogMap = Maps.newHashMap(); timestamps = Lists.newArrayList(); droppedPartitions = Lists.newArrayList(); + droppedTables = Lists.newArrayList(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -121,6 +124,11 @@ public class DBBinlog { if (info != null && info.getPartitionId() > 0) { droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq())); } + } else if (binlog.getType() == TBinlogType.DROP_TABLE) { + DropTableRecord record = DropTableRecord.fromJson(binlog.data); + if (record != null && record.getTableId() > 0) { + droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq())); + } } if (tableIds == null) { @@ -174,6 +182,19 @@ public class DBBinlog { if (!binlog.isSetType()) { return; } + + if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { + long partitionId = ((DropPartitionInfo) raw).getPartitionId(); + if (partitionId > 0) { + droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + } + } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) { + long tableId = ((DropTableRecord) raw).getTableId(); + if (tableId > 0) { + droppedTables.add(Pair.of(tableId, binlog.getCommitSeq())); + } + } + switch (binlog.getType()) { case CREATE_TABLE: return; @@ -183,13 +204,6 @@ public class DBBinlog { break; } - if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { - long partitionId = ((DropPartitionInfo) raw).getPartitionId(); - if (partitionId > 0) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); - } - } - for (long tableId : tableIds) { TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable); if (tableBinlog != null) { @@ -237,6 +251,18 @@ public class DBBinlog { } } + // Get the dropped tables of the db. + public List<Long> getDroppedTables() { + lock.readLock().lock(); + try { + return droppedTables.stream() + .map(v -> v.first) + .collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } + public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); @@ -354,7 +380,7 @@ public class DBBinlog { } } - gcDroppedPartitions(largestExpiredCommitSeq); + gcDroppedPartitionAndTables(largestExpiredCommitSeq); if (lastCommitSeq != -1) { dummy.setCommitSeq(lastCommitSeq); } @@ -392,7 +418,7 @@ public class DBBinlog { timeIter.remove(); } - gcDroppedPartitions(lastExpiredBinlog.getCommitSeq()); + gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq()); } return lastExpiredBinlog; @@ -502,11 +528,15 @@ public class DBBinlog { } } - private void gcDroppedPartitions(long commitSeq) { + private void gcDroppedPartitionAndTables(long commitSeq) { Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator(); while (iter.hasNext() && iter.next().second < commitSeq) { iter.remove(); } + iter = droppedTables.iterator(); + while (iter.hasNext() && iter.next().second < commitSeq) { + iter.remove(); + } } // not thread safety, do this without lock diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java index 155898cbc9c..4417edeb973 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java @@ -58,6 +58,10 @@ public class DropTableRecord { return GsonUtils.GSON.toJson(this); } + public static DropTableRecord fromJson(String json) { + return GsonUtils.GSON.fromJson(json, DropTableRecord.class); + } + @Override public String toString() { return toJson(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index d10cba5fd0b..94d1d5c4cfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -6307,6 +6307,7 @@ public class Env { if (Config.enable_feature_binlog) { BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager(); dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId())); + dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId())); } result.setDbMeta(dbMeta); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2867e15c3c1..ecade162304 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1457,6 +1457,7 @@ struct TGetMetaDBMeta { 2: optional string name 3: optional list<TGetMetaTableMeta> tables 4: optional list<i64> dropped_partitions + 5: optional list<i64> dropped_tables } struct TGetMetaResult { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
