This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2d1dac6c527 [improvement](binlog) filter dropped indexes #41246 
(#41299)
2d1dac6c527 is described below

commit 2d1dac6c527be2a97a1556714969a60baf1131f3
Author: walter <[email protected]>
AuthorDate: Wed Sep 25 21:56:12 2024 +0800

    [improvement](binlog) filter dropped indexes #41246 (#41299)
    
    cherry pick from #41246
---
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  4 ++
 .../org/apache/doris/binlog/AlterJobRecord.java    | 29 +++++++++++++++
 .../org/apache/doris/binlog/BinlogManager.java     | 16 +++++++-
 .../java/org/apache/doris/binlog/DBBinlog.java     | 43 ++++++++++++++++++++--
 .../main/java/org/apache/doris/catalog/Env.java    |  1 +
 gensrc/thrift/FrontendService.thrift               |  1 +
 6 files changed, 90 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 2591d3106e0..f524fecdd9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -916,6 +916,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
     }
 
+    public Map<Long, Long> getIndexIdMap() {
+        return indexIdMap;
+    }
+
     public List<List<String>> getUnfinishedTasks(int limit) {
         List<List<String>> taskInfos = Lists.newArrayList();
         if (jobState == JobState.RUNNING) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java
index 36c772d5246..51d11035300 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java
@@ -18,10 +18,15 @@
 package org.apache.doris.binlog;
 
 import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.alter.SchemaChangeJobV2;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.gson.annotations.SerializedName;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public class AlterJobRecord {
     @SerializedName(value = "type")
     private AlterJobV2.JobType type;
@@ -37,6 +42,8 @@ public class AlterJobRecord {
     private AlterJobV2.JobState jobState;
     @SerializedName(value = "rawSql")
     private String rawSql;
+    @SerializedName(value = "iim")
+    private Map<Long, Long> indexIdMap;
 
     public AlterJobRecord(AlterJobV2 job) {
         this.type = job.getType();
@@ -46,9 +53,31 @@ public class AlterJobRecord {
         this.jobId = job.getJobId();
         this.jobState = job.getJobState();
         this.rawSql = job.getRawSql();
+        if (type == AlterJobV2.JobType.SCHEMA_CHANGE && job instanceof 
SchemaChangeJobV2) {
+            this.indexIdMap = ((SchemaChangeJobV2) job).getIndexIdMap();
+        }
+    }
+
+    public boolean isJobFinished() {
+        return jobState == AlterJobV2.JobState.FINISHED;
+    }
+
+    public boolean isSchemaChangeJob() {
+        return type == AlterJobV2.JobType.SCHEMA_CHANGE;
+    }
+
+    public List<Long> getOriginIndexIdList() {
+        if (indexIdMap == null) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(indexIdMap.values());
     }
 
     public String toJson() {
         return GsonUtils.GSON.toJson(this);
     }
+
+    public static AlterJobRecord fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, AlterJobRecord.class);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index fc3115e2b92..db49b5c2b26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -225,7 +225,7 @@ public class BinlogManager {
         AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob);
         String data = alterJobRecord.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
alterJob);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
alterJobRecord);
     }
 
     public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, 
long commitSeq) {
@@ -383,6 +383,20 @@ public class BinlogManager {
         }
     }
 
+    // get the dropped indexes of the db.
+    public List<Long> getDroppedIndexes(long dbId) {
+        lock.readLock().lock();
+        try {
+            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+            if (dbBinlog == null) {
+                return Lists.newArrayList();
+            }
+            return dbBinlog.getDroppedIndexes();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     public List<BinlogTombstone> gc() {
         LOG.info("begin gc binlog");
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 502491004e5..8469bdcc7d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -64,6 +64,8 @@ public class DBBinlog {
     private List<Pair<Long, Long>> droppedPartitions;
     // The commit seq of the dropped tables
     private List<Pair<Long, Long>> droppedTables;
+    // The commit seq of the dropped indexes
+    private List<Pair<Long, Long>> droppedIndexes;
 
     private List<TBinlog> tableDummyBinlogs;
 
@@ -82,6 +84,7 @@ public class DBBinlog {
         timestamps = Lists.newArrayList();
         droppedPartitions = Lists.newArrayList();
         droppedTables = Lists.newArrayList();
+        droppedIndexes = Lists.newArrayList();
 
         TBinlog dummy;
         if (binlog.getType() == TBinlogType.DUMMY) {
@@ -129,6 +132,15 @@ public class DBBinlog {
             if (record != null && record.getTableId() > 0) {
                 droppedTables.add(Pair.of(record.getTableId(), 
binlog.getCommitSeq()));
             }
+        } else if (binlog.getType() == TBinlogType.ALTER_JOB) {
+            AlterJobRecord record = AlterJobRecord.fromJson(binlog.data);
+            if (record != null && record.isSchemaChangeJob() && 
record.isJobFinished()) {
+                for (Long indexId : record.getOriginIndexIdList()) {
+                    if (indexId != null && indexId > 0) {
+                        droppedIndexes.add(Pair.of(indexId, 
binlog.getCommitSeq()));
+                    }
+                }
+            }
         }
 
         if (tableIds == null) {
@@ -193,6 +205,15 @@ public class DBBinlog {
                 if (tableId > 0) {
                     droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
                 }
+            } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw 
instanceof AlterJobRecord) {
+                AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
+                if (alterJobRecord.isJobFinished() && 
alterJobRecord.isSchemaChangeJob()) {
+                    for (Long indexId : alterJobRecord.getOriginIndexIdList()) 
{
+                        if (indexId != null && indexId > 0) {
+                            droppedIndexes.add(Pair.of(indexId, 
binlog.getCommitSeq()));
+                        }
+                    }
+                }
             }
 
             switch (binlog.getType()) {
@@ -263,6 +284,18 @@ public class DBBinlog {
         }
     }
 
+    // Get the dropped indexes of the db.
+    public List<Long> getDroppedIndexes() {
+        lock.readLock().lock();
+        try {
+            return droppedIndexes.stream()
+                    .map(v -> v.first)
+                    .collect(Collectors.toList());
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         lock.readLock().lock();
@@ -380,7 +413,7 @@ public class DBBinlog {
                 }
             }
 
-            gcDroppedPartitionAndTables(largestExpiredCommitSeq);
+            gcDroppedResources(largestExpiredCommitSeq);
             if (lastCommitSeq != -1) {
                 dummy.setCommitSeq(lastCommitSeq);
             }
@@ -418,7 +451,7 @@ public class DBBinlog {
                 timeIter.remove();
             }
 
-            gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq());
+            gcDroppedResources(lastExpiredBinlog.getCommitSeq());
         }
 
         return lastExpiredBinlog;
@@ -528,7 +561,7 @@ public class DBBinlog {
         }
     }
 
-    private void gcDroppedPartitionAndTables(long commitSeq) {
+    private void gcDroppedResources(long commitSeq) {
         Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
         while (iter.hasNext() && iter.next().second < commitSeq) {
             iter.remove();
@@ -537,6 +570,10 @@ public class DBBinlog {
         while (iter.hasNext() && iter.next().second < commitSeq) {
             iter.remove();
         }
+        iter = droppedIndexes.iterator();
+        while (iter.hasNext() && iter.next().second < commitSeq) {
+            iter.remove();
+        }
     }
 
     // not thread safety, do this without lock
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 827e4a222b2..44614c49f8b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5628,6 +5628,7 @@ public class Env {
             BinlogManager binlogManager = 
Env.getCurrentEnv().getBinlogManager();
             
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
             
dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId()));
+            
dbMeta.setDroppedIndexes(binlogManager.getDroppedIndexes(db.getId()));
         }
 
         result.setDbMeta(dbMeta);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index c391c802a03..a3c25c17616 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1230,6 +1230,7 @@ struct TGetMetaDBMeta {
     3: optional list<TGetMetaTableMeta> tables
     4: optional list<i64> dropped_partitions
     5: optional list<i64> dropped_tables
+    6: optional list<i64> dropped_indexes
 }
 
 struct TGetMetaResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to