This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch filter_dropped_tables
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ff89966af5a17bc56e150b9d203a2fed5ce7f137
Author: w41ter <[email protected]>
AuthorDate: Wed Jul 17 11:54:59 2024 +0000

    [chore](fe) Returns dropped tables in GetMeta request
    
    The CCR syncer needs to know the distribution of tables, partitions,
    indexes, and replicas when synchronizing binlogs. If a table is
    deleted before the binlog synchronization is complete, the CCR syncer
    cannot continue synchronizing. This PR will record the deleted
    tables and include them in the get meta response, allowing the CCR
    syncer to filter out the binlogs that belong to these tables.
    
    The CCR syncer part PR is selectdb/ccr-syncer#123.
---
 be/src/runtime/snapshot_loader.cpp                 |  1 -
 .../org/apache/doris/binlog/BinlogManager.java     | 14 ++++++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 50 +++++++++++++++++-----
 .../org/apache/doris/binlog/DropTableRecord.java   |  4 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  1 +
 gensrc/thrift/FrontendService.thrift               |  1 +
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index b840636a46f..f34dfde229a 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -458,7 +458,6 @@ Status SnapshotLoader::remote_http_download(
     }
 
     // Step 3: Validate remote tablet snapshot paths && remote files map
-    // TODO(Drogon): Add md5sum check
     // key is remote snapshot paths, value is filelist
     // get all these use http download action
     // 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 47d39dd4dc1..6fc2e3f813f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -380,6 +380,20 @@ public class BinlogManager {
         }
     }
 
+    // get the dropped tables of the db.
+    public List<Long> getDroppedTables(long dbId) {
+        lock.readLock().lock();
+        try {
+            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+            if (dbBinlog == null) {
+                return Lists.newArrayList();
+            }
+            return dbBinlog.getDroppedTables();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     public List<BinlogTombstone> gc() {
         LOG.info("begin gc binlog");
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index b43805b06d5..502491004e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -62,6 +62,8 @@ public class DBBinlog {
 
     // The commit seq of the dropped partitions
     private List<Pair<Long, Long>> droppedPartitions;
+    // The commit seq of the dropped tables
+    private List<Pair<Long, Long>> droppedTables;
 
     private List<TBinlog> tableDummyBinlogs;
 
@@ -79,6 +81,7 @@ public class DBBinlog {
         tableBinlogMap = Maps.newHashMap();
         timestamps = Lists.newArrayList();
         droppedPartitions = Lists.newArrayList();
+        droppedTables = Lists.newArrayList();
 
         TBinlog dummy;
         if (binlog.getType() == TBinlogType.DUMMY) {
@@ -121,6 +124,11 @@ public class DBBinlog {
             if (info != null && info.getPartitionId() > 0) {
                 droppedPartitions.add(Pair.of(info.getPartitionId(), 
binlog.getCommitSeq()));
             }
+        } else if (binlog.getType() == TBinlogType.DROP_TABLE) {
+            DropTableRecord record = DropTableRecord.fromJson(binlog.data);
+            if (record != null && record.getTableId() > 0) {
+                droppedTables.add(Pair.of(record.getTableId(), 
binlog.getCommitSeq()));
+            }
         }
 
         if (tableIds == null) {
@@ -174,6 +182,19 @@ public class DBBinlog {
             if (!binlog.isSetType()) {
                 return;
             }
+
+            if (binlog.getType() == TBinlogType.DROP_PARTITION && raw 
instanceof DropPartitionInfo) {
+                long partitionId = ((DropPartitionInfo) raw).getPartitionId();
+                if (partitionId > 0) {
+                    droppedPartitions.add(Pair.of(partitionId, 
binlog.getCommitSeq()));
+                }
+            } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw 
instanceof DropTableRecord) {
+                long tableId = ((DropTableRecord) raw).getTableId();
+                if (tableId > 0) {
+                    droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
+                }
+            }
+
             switch (binlog.getType()) {
                 case CREATE_TABLE:
                     return;
@@ -183,13 +204,6 @@ public class DBBinlog {
                     break;
             }
 
-            if (binlog.getType() == TBinlogType.DROP_PARTITION && raw 
instanceof DropPartitionInfo) {
-                long partitionId = ((DropPartitionInfo) raw).getPartitionId();
-                if (partitionId > 0) {
-                    droppedPartitions.add(Pair.of(partitionId, 
binlog.getCommitSeq()));
-                }
-            }
-
             for (long tableId : tableIds) {
                 TableBinlog tableBinlog = getTableBinlog(binlog, tableId, 
dbBinlogEnable);
                 if (tableBinlog != null) {
@@ -237,6 +251,18 @@ public class DBBinlog {
         }
     }
 
+    // Get the dropped tables of the db.
+    public List<Long> getDroppedTables() {
+        lock.readLock().lock();
+        try {
+            return droppedTables.stream()
+                    .map(v -> v.first)
+                    .collect(Collectors.toList());
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         lock.readLock().lock();
@@ -354,7 +380,7 @@ public class DBBinlog {
                 }
             }
 
-            gcDroppedPartitions(largestExpiredCommitSeq);
+            gcDroppedPartitionAndTables(largestExpiredCommitSeq);
             if (lastCommitSeq != -1) {
                 dummy.setCommitSeq(lastCommitSeq);
             }
@@ -392,7 +418,7 @@ public class DBBinlog {
                 timeIter.remove();
             }
 
-            gcDroppedPartitions(lastExpiredBinlog.getCommitSeq());
+            gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq());
         }
 
         return lastExpiredBinlog;
@@ -502,11 +528,15 @@ public class DBBinlog {
         }
     }
 
-    private void gcDroppedPartitions(long commitSeq) {
+    private void gcDroppedPartitionAndTables(long commitSeq) {
         Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
         while (iter.hasNext() && iter.next().second < commitSeq) {
             iter.remove();
         }
+        iter = droppedTables.iterator();
+        while (iter.hasNext() && iter.next().second < commitSeq) {
+            iter.remove();
+        }
     }
 
     // not thread safety, do this without lock
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java
index 155898cbc9c..4417edeb973 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java
@@ -58,6 +58,10 @@ public class DropTableRecord {
         return GsonUtils.GSON.toJson(this);
     }
 
+    public static DropTableRecord fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, DropTableRecord.class);
+    }
+
     @Override
     public String toString() {
         return toJson();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index d10cba5fd0b..94d1d5c4cfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -6307,6 +6307,7 @@ public class Env {
         if (Config.enable_feature_binlog) {
             BinlogManager binlogManager = 
Env.getCurrentEnv().getBinlogManager();
             
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
+            
dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId()));
         }
 
         result.setDbMeta(dbMeta);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 2867e15c3c1..ecade162304 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1457,6 +1457,7 @@ struct TGetMetaDBMeta {
     2: optional string name
     3: optional list<TGetMetaTableMeta> tables
     4: optional list<i64> dropped_partitions
+    5: optional list<i64> dropped_tables
 }
 
 struct TGetMetaResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to