This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2957b197302 [improve](binlog) Filter the truncated partitions (#41611)
2957b197302 is described below
commit 2957b197302e1ebd582a5abc44da7a88e1e2dfc3
Author: walter <[email protected]>
AuthorDate: Wed Oct 9 20:30:07 2024 +0800
[improve](binlog) Filter the truncated partitions (#41611)
The suite case in https://github.com/selectdb/ccr-syncer/pull/190
---
.../main/java/org/apache/doris/binlog/BinlogManager.java | 2 +-
.../src/main/java/org/apache/doris/binlog/DBBinlog.java | 12 ++++++++++++
.../org/apache/doris/binlog/TruncateTableRecord.java | 16 ++++++++++++++++
.../org/apache/doris/datasource/InternalCatalog.java | 3 +--
.../java/org/apache/doris/persist/TruncateTableInfo.java | 13 ++++++++++++-
5 files changed, 42 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index ecdc1b2ade7..079a3c6527a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -318,7 +318,7 @@ public class BinlogManager {
TruncateTableRecord record = new TruncateTableRecord(info);
String data = record.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
info);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
record);
}
public void addTableRename(TableInfo info, long commitSeq) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 8469bdcc7d1..86cf8085a42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -141,6 +141,13 @@ public class DBBinlog {
}
}
}
+ } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE) {
+ TruncateTableRecord record =
TruncateTableRecord.fromJson(binlog.data);
+ if (record != null) {
+ for (long partitionId : record.getOldPartitionIds()) {
+ droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
+ }
+ }
}
if (tableIds == null) {
@@ -214,6 +221,11 @@ public class DBBinlog {
}
}
}
+ } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw
instanceof TruncateTableRecord) {
+ TruncateTableRecord truncateTableRecord =
(TruncateTableRecord) raw;
+ for (long partitionId :
truncateTableRecord.getOldPartitionIds()) {
+ droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
+ }
}
switch (binlog.getType()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/TruncateTableRecord.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TruncateTableRecord.java
index 0c43ce781cd..cb5b5641889 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TruncateTableRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TruncateTableRecord.java
@@ -22,6 +22,11 @@ import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
public class TruncateTableRecord {
@SerializedName(value = "dbId")
private long dbId;
@@ -35,6 +40,8 @@ public class TruncateTableRecord {
private boolean isEntireTable = false;
@SerializedName(value = "rawSql")
private String rawSql = "";
+ @SerializedName(value = "op")
+ private Map<Long, String> oldPartitions = new HashMap<>();
public TruncateTableRecord(TruncateTableInfo info) {
this.dbId = info.getDbId();
@@ -43,9 +50,18 @@ public class TruncateTableRecord {
this.table = info.getTable();
this.isEntireTable = info.isEntireTable();
this.rawSql = info.getRawSql();
+ this.oldPartitions = info.getOldPartitions();
+ }
+
+ public Collection<Long> getOldPartitionIds() {
+ return oldPartitions == null ? new ArrayList<>() :
oldPartitions.keySet();
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
+
+ public static TruncateTableRecord fromJson(String json) {
+ return GsonUtils.GSON.fromJson(json, TruncateTableRecord.class);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 9f7d27669d8..d2176de0100 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3698,8 +3698,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// write edit log
TruncateTableInfo info =
new TruncateTableInfo(db.getId(), db.getFullName(),
olapTable.getId(), olapTable.getName(),
- newPartitions,
- truncateEntireTable,
truncateTableStmt.toSqlWithoutTable());
+ newPartitions, truncateEntireTable,
truncateTableStmt.toSqlWithoutTable(), oldPartitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (DdlException e) {
failedCleanCallback.run();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
index a9a91f28839..b252b2a3823 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
@@ -28,7 +28,9 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TruncateTableInfo implements Writable {
@SerializedName(value = "dbId")
@@ -45,13 +47,15 @@ public class TruncateTableInfo implements Writable {
private boolean isEntireTable = false;
@SerializedName(value = "rawSql")
private String rawSql = "";
+ @SerializedName(value = "op")
+ private Map<Long, String> oldPartitions = new HashMap<>();
public TruncateTableInfo() {
}
public TruncateTableInfo(long dbId, String db, long tblId, String table,
List<Partition> partitions,
- boolean isEntireTable, String rawSql) {
+ boolean isEntireTable, String rawSql, List<Partition>
oldPartitions) {
this.dbId = dbId;
this.db = db;
this.tblId = tblId;
@@ -59,6 +63,9 @@ public class TruncateTableInfo implements Writable {
this.partitions = partitions;
this.isEntireTable = isEntireTable;
this.rawSql = rawSql;
+ for (Partition partition : oldPartitions) {
+ this.oldPartitions.put(partition.getId(), partition.getName());
+ }
}
public long getDbId() {
@@ -81,6 +88,10 @@ public class TruncateTableInfo implements Writable {
return partitions;
}
+ public Map<Long, String> getOldPartitions() {
+ return oldPartitions == null ? new HashMap<>() : oldPartitions;
+ }
+
public boolean isEntireTable() {
return isEntireTable;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]