This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 92176c46bf9 branch-2.1: [feat](binlog) filter the async mv binlogs
#49028 (#49099)
92176c46bf9 is described below
commit 92176c46bf9bf3c8474b2a24605a828d7b66234e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 28 10:01:00 2025 +0800
branch-2.1: [feat](binlog) filter the async mv binlogs #49028 (#49099)
Cherry-picked from #49028
Co-authored-by: walter <[email protected]>
---
.../org/apache/doris/binlog/BinlogConfigCache.java | 70 +++++++++++++++-------
.../org/apache/doris/binlog/BinlogManager.java | 22 +++++++
2 files changed, 69 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
index b07f5e5d87c..0bce3f375aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,10 +35,12 @@ public class BinlogConfigCache {
private static final Logger LOG =
LogManager.getLogger(BinlogConfigCache.class);
private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all
use id
+ private Map<Long, TableIf.TableType> tableTypeMap;
private ReentrantReadWriteLock lock;
public BinlogConfigCache() {
dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
+ tableTypeMap = new HashMap<Long, TableIf.TableType>();
lock = new ReentrantReadWriteLock();
}
@@ -93,29 +96,8 @@ public class BinlogConfigCache {
lock.writeLock().lock();
try {
- Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
- if (db == null) {
- LOG.warn("db not found. dbId: {}", dbId);
- return null;
- }
-
- Table table = db.getTableNullable(tableId);
- if (table == null) {
- LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
- return null;
- }
- if (!(table instanceof OlapTable)) {
- LOG.warn("table is not olap table. db: {}, table id: {}",
db.getFullName(), tableId);
- return null;
- }
-
- OlapTable olapTable = (OlapTable) table;
- tableBinlogConfig = olapTable.getBinlogConfig();
- // get table binlog config, when table modify binlogConfig
- // it create a new binlog, not update inplace, so we don't need to
clone
- // binlogConfig
- dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
- return tableBinlogConfig;
+ loadTableBinlogConfig(dbId, tableId);
+ return dbTableBinlogEnableMap.get(tableId); // null if not exists
} catch (Exception e) {
LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
return null;
@@ -124,6 +106,48 @@ public class BinlogConfigCache {
}
}
+ private void loadTableBinlogConfig(long dbId, long tableId) {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ LOG.warn("db not found. dbId: {}", dbId);
+ return;
+ }
+
+ Table table = db.getTableNullable(tableId);
+ if (table == null) {
+ LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
+ return;
+ }
+ if (!(table instanceof OlapTable)) { // MTMV is an instance of
OlapTable
+ LOG.warn("table is not olap table. db: {}, table id: {}",
db.getFullName(), tableId);
+ return;
+ }
+
+ OlapTable olapTable = (OlapTable) table;
+ // get table binlog config, when table modify binlogConfig
+ // it create a new binlog, not update inplace, so we don't need to
clone
+ // binlogConfig
+ dbTableBinlogEnableMap.put(tableId, olapTable.getBinlogConfig());
+ tableTypeMap.put(tableId, table.getType());
+ }
+
+ public boolean isAsyncMvTable(long dbId, long tableId) {
+ lock.readLock().lock();
+ TableIf.TableType tableType = tableTypeMap.get(tableId);
+ lock.readLock().unlock();
+ if (tableType != null) {
+ return tableType == TableIf.TableType.MATERIALIZED_VIEW;
+ }
+
+ lock.writeLock().lock();
+ try {
+ loadTableBinlogConfig(dbId, tableId);
+ return tableTypeMap.get(tableId) ==
TableIf.TableType.MATERIALIZED_VIEW;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
public boolean isEnableTable(long dbId, long tableId) {
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b35092a830d..094a4b27c9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -107,11 +107,33 @@ public class BinlogManager {
}
}
+ private boolean isAsyncMvBinlog(TBinlog binlog) {
+ if (!binlog.isSetTableIds()) {
+ return false;
+ }
+
+ // Filter the binlogs belong to async materialized view, since we
don't support async mv right now.
+ for (long tableId : binlog.getTableIds()) {
+ if (binlogConfigCache.isAsyncMvTable(binlog.getDbId(), tableId)) {
+ LOG.debug("filter the async mv binlog, db {}, table {}, commit
seq {}, ts {}, type {}, data {}",
+ binlog.getDbId(), binlog.getTableIds(),
binlog.getCommitSeq(), binlog.getTimestamp(),
+ binlog.getType(), binlog.getData());
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private void addBinlog(TBinlog binlog, Object raw) {
if (!Config.enable_feature_binlog) {
return;
}
+ if (isAsyncMvBinlog(binlog)) {
+ return;
+ }
+
LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {},
type {}, data {}",
binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(),
binlog.getTimestamp(), binlog.getType(),
binlog.getData());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]