This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 816fd50d1d [Enhancement](binlog) Add binlog enable diable check in
BinlogManager (#22173)
816fd50d1d is described below
commit 816fd50d1d0eb7f1af5ea02e9290388505b5b06f
Author: Jack Drogon <[email protected]>
AuthorDate: Thu Jul 27 20:16:21 2023 +0800
[Enhancement](binlog) Add binlog enable diable check in BinlogManager
(#22173)
Signed-off-by: Jack Drogon <[email protected]>
---
.../org/apache/doris/binlog/BinlogConfigCache.java | 146 +++++++++++++++++++++
.../java/org/apache/doris/binlog/BinlogGcer.java | 2 +-
.../org/apache/doris/binlog/BinlogManager.java | 102 +++++++++++---
.../org/apache/doris/binlog/BinlogTombstone.java | 4 +-
.../java/org/apache/doris/binlog/BinlogUtils.java | 36 -----
.../java/org/apache/doris/binlog/DBBinlog.java | 49 ++++---
.../java/org/apache/doris/binlog/TableBinlog.java | 37 ++----
.../main/java/org/apache/doris/catalog/Env.java | 21 +--
.../apache/doris/datasource/InternalCatalog.java | 3 +-
.../doris/persist/AlterDatabasePropertyInfo.java | 10 +-
.../java/org/apache/doris/persist/EditLog.java | 26 ++--
.../persist/ModifyTablePropertyOperationLog.java | 20 ++-
.../persist/ModifyDynamicPartitionInfoTest.java | 2 +-
gensrc/thrift/FrontendService.thrift | 3 +
14 files changed, 339 insertions(+), 122 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
new file mode 100644
index 0000000000..c414b85307
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class BinlogConfigCache {
+ private static final Logger LOG =
LogManager.getLogger(BinlogConfigCache.class);
+
+ private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all
use id
+ private ReentrantReadWriteLock lock;
+
+ public BinlogConfigCache() {
+ dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
+ lock = new ReentrantReadWriteLock();
+ }
+
+ public BinlogConfig getDBBinlogConfig(long dbId) {
+ lock.readLock().lock();
+ BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
+ lock.readLock().unlock();
+ if (binlogConfig != null) {
+ return binlogConfig;
+ }
+
+ lock.writeLock().lock();
+ try {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ LOG.warn("db not found. dbId: {}", dbId);
+ return null;
+ }
+
+ binlogConfig = db.getBinlogConfig();
+ dbTableBinlogEnableMap.put(dbId, binlogConfig);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return binlogConfig;
+ }
+
+ public boolean isEnableDB(long dbId) {
+ BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
+ if (dBinlogConfig == null) {
+ return false;
+ }
+ return dBinlogConfig.isEnable();
+ }
+
+ public long getDBTtlSeconds(long dbId) {
+ BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
+ if (dBinlogConfig == null) {
+ return BinlogConfig.TTL_SECONDS;
+ }
+ return dBinlogConfig.getTtlSeconds();
+ }
+
+ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
+ lock.readLock().lock();
+ BinlogConfig tableBinlogConfig = dbTableBinlogEnableMap.get(tableId);
+ lock.readLock().unlock();
+ if (tableBinlogConfig != null) {
+ return tableBinlogConfig;
+ }
+
+ lock.writeLock().lock();
+ try {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ LOG.warn("db not found. dbId: {}", dbId);
+ return null;
+ }
+
+ Table table = db.getTableOrMetaException(tableId);
+ if (table == null) {
+ LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
+ return null;
+ }
+ if (!(table instanceof OlapTable)) {
+ LOG.warn("table is not olap table. db: {}, table id: {}",
db.getFullName(), tableId);
+ return null;
+ }
+
+ OlapTable olapTable = (OlapTable) table;
+ tableBinlogConfig = olapTable.getBinlogConfig();
+ dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
+ return tableBinlogConfig;
+ } catch (Exception e) {
+ LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
+ return null;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean isEnableTable(long dbId, long tableId) {
+ BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
+ if (tableBinlogConfig == null) {
+ return false;
+ }
+ return tableBinlogConfig.isEnable();
+ }
+
+ public long getTableTtlSeconds(long dbId, long tableId) {
+ BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
+ if (tableBinlogConfig == null) {
+ return BinlogConfig.TTL_SECONDS;
+ }
+ return tableBinlogConfig.getTtlSeconds();
+ }
+
+ public void remove(long id) {
+ lock.writeLock().lock();
+ try {
+ dbTableBinlogEnableMap.remove(id);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
index 96d41946ff..6dbe47ea22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -41,7 +41,7 @@ import java.util.Map;
public class BinlogGcer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
- private static final long GC_DURATION_MS = 313 * 1000L; // 313s
+ private static final long GC_DURATION_MS = 15 * 1000L; // 15s
// TODO(Drogon): use this to control gc frequency by real gc time waste
sample
private long lastGcTime = 0L;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 11075c4fc4..43a95ed28e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -22,8 +22,10 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
@@ -53,43 +55,63 @@ public class BinlogManager {
private ReentrantReadWriteLock lock;
private Map<Long, DBBinlog> dbBinlogMap;
+ private BinlogConfigCache binlogConfigCache;
public BinlogManager() {
lock = new ReentrantReadWriteLock();
dbBinlogMap = Maps.newHashMap();
+ binlogConfigCache = new BinlogConfigCache();
}
- private void addBinlog(TBinlog binlog) {
- if (!Config.enable_feature_binlog) {
+ private void afterAddBinlog(TBinlog binlog) {
+ if (!binlog.isSetRemoveEnableCache()) {
+ return;
+ }
+ if (!binlog.isRemoveEnableCache()) {
return;
}
- // find db BinlogConfig
long dbId = binlog.getDbId();
- Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
- if (db == null) {
- LOG.warn("db not found. dbId: {}", dbId);
+ boolean onlyDb = true;
+ if (binlog.isSetTableIds()) {
+ for (long tableId : binlog.getTableIds()) {
+ binlogConfigCache.remove(tableId);
+ onlyDb = false;
+ }
+ }
+ if (onlyDb) {
+ binlogConfigCache.remove(dbId);
+ }
+ }
+
+ private void addBinlog(TBinlog binlog) {
+ if (!Config.enable_feature_binlog) {
return;
}
- boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
DBBinlog dbBinlog;
lock.writeLock().lock();
try {
+ long dbId = binlog.getDbId();
dbBinlog = dbBinlogMap.get(dbId);
+
if (dbBinlog == null) {
- dbBinlog = new DBBinlog(binlog);
+ dbBinlog = new DBBinlog(binlogConfigCache, binlog);
dbBinlogMap.put(dbId, dbBinlog);
}
} finally {
lock.writeLock().unlock();
}
- dbBinlog.addBinlog(binlog, dbBinlogEnable);
+ dbBinlog.addBinlog(binlog);
}
private void addBinlog(long dbId, List<Long> tableIds, long commitSeq,
long timestamp, TBinlogType type,
- String data) {
+ String data, boolean removeEnableCache) {
+ if (!Config.enable_feature_binlog) {
+ return;
+ }
+
TBinlog binlog = new TBinlog();
// set commitSeq, timestamp, type, dbId, data
binlog.setCommitSeq(commitSeq);
@@ -101,7 +123,26 @@ public class BinlogManager {
binlog.setTableIds(tableIds);
}
binlog.setTableRef(0);
- addBinlog(binlog);
+ binlog.setRemoveEnableCache(removeEnableCache);
+
+ // Check if all db or table binlog is disable, return
+ boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
+ boolean anyEnable = dbBinlogEnable;
+ if (tableIds != null) {
+ for (long tableId : tableIds) {
+ boolean tableBinlogEnable =
binlogConfigCache.isEnableTable(dbId, tableId);
+ anyEnable = anyEnable || tableBinlogEnable;
+ if (anyEnable) {
+ break;
+ }
+ }
+ }
+
+ if (anyEnable) {
+ addBinlog(binlog);
+ }
+
+ afterAddBinlog(binlog);
}
public void addUpsertRecord(UpsertRecord upsertRecord) {
@@ -112,7 +153,7 @@ public class BinlogManager {
TBinlogType type = TBinlogType.UPSERT;
String data = upsertRecord.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
@@ -124,7 +165,7 @@ public class BinlogManager {
TBinlogType type = TBinlogType.ADD_PARTITION;
String data = addPartitionRecord.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
public void addCreateTableRecord(CreateTableRecord createTableRecord) {
@@ -136,7 +177,7 @@ public class BinlogManager {
TBinlogType type = TBinlogType.CREATE_TABLE;
String data = createTableRecord.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo,
long commitSeq) {
@@ -147,7 +188,7 @@ public class BinlogManager {
TBinlogType type = TBinlogType.DROP_PARTITION;
String data = dropPartitionInfo.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
public void addDropTableRecord(DropTableRecord record) {
@@ -159,7 +200,7 @@ public class BinlogManager {
TBinlogType type = TBinlogType.DROP_TABLE;
String data = record.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
@@ -170,7 +211,7 @@ public class BinlogManager {
TBinlogType type = TBinlogType.ALTER_JOB;
String data = alterJob.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info,
long commitSeq) {
@@ -181,7 +222,29 @@ public class BinlogManager {
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
String data = info.toJson();
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
+ }
+
+ public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long
commitSeq) {
+ long dbId = info.getDbId();
+ List<Long> tableIds = Lists.newArrayList();
+
+ long timestamp = -1;
+ TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY;
+ String data = info.toJson();
+
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
+ }
+
+ public void addModifyTableProperty(ModifyTablePropertyOperationLog info,
long commitSeq) {
+ long dbId = info.getDbId();
+ List<Long> tableIds = Lists.newArrayList();
+ tableIds.add(info.getTableId());
+ long timestamp = -1;
+ TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY;
+ String data = info.toJson();
+
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
}
// get binlog by dbId, return first binlog.version > version
@@ -383,7 +446,8 @@ public class BinlogManager {
if (binlog.getType() == TBinlogType.DUMMY) {
// collect tableDummyBinlogs and dbDummyBinlog to recover
DBBinlog and TableBinlog
if (binlog.getBelong() == -1) {
- DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog,
tableDummies, currentDbBinlogEnable);
+ DBBinlog dbBinlog =
DBBinlog.recoverDbBinlog(binlogConfigCache, binlog, tableDummies,
+ currentDbBinlogEnable);
dbBinlogMap.put(dbId, dbBinlog);
} else {
tableDummies.add(binlog);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
index 2b6e3cb8e1..48d5e04244 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
@@ -92,7 +92,7 @@ public class BinlogTombstone {
return dbId;
}
- // TODO(deadlinefen): delete this code later
+ // TODO(deadlinefen): deprecated this code later
public List<Long> getTableIds() {
if (tableIds == null) {
tableIds = Collections.emptyList();
@@ -102,7 +102,7 @@ public class BinlogTombstone {
public Map<Long, Long> getTableCommitSeqMap() {
if (tableCommitSeqMap == null) {
- tableCommitSeqMap = Collections.emptyMap();
+ tableCommitSeqMap = Maps.newHashMap();
}
return tableCommitSeqMap;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 30cbfd0e15..4e134104b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -17,24 +17,15 @@
package org.apache.doris.binlog;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.TreeSet;
public class BinlogUtils {
- private static final Logger LOG = LogManager.getLogger(BinlogUtils.class);
-
public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs,
long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();
@@ -90,33 +81,6 @@ public class BinlogUtils {
return dummy;
}
- public static boolean tableEnabledBinlog(long dbId, long tableId) {
- Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
- if (db == null) {
- LOG.error("db not found. dbId: {}", dbId);
- return false;
- }
-
- OlapTable table;
- try {
- Table tbl = db.getTableOrMetaException(tableId);
- if (tbl == null) {
- LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
- return false;
- }
- if (!(tbl instanceof OlapTable)) {
- LOG.warn("table is not olap table. db: {}, table id: {}",
db.getFullName(), tableId);
- return false;
- }
- table = (OlapTable) tbl;
- } catch (Exception e) {
- LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
- return false;
- }
-
- return table.getBinlogConfig().isEnable();
- }
-
public static long getExpiredMs(long ttlSeconds) {
long currentSeconds = System.currentTimeMillis() / 1000;
if (currentSeconds < ttlSeconds) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 151c5e5be9..35134eca87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -17,8 +17,7 @@
package org.apache.doris.binlog;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
@@ -55,9 +54,12 @@ public class DBBinlog {
private List<TBinlog> tableDummyBinlogs;
- public DBBinlog(TBinlog binlog) {
+ private BinlogConfigCache binlogConfigCache;
+
+ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
+ this.binlogConfigCache = binlogConfigCache;
// allBinlogs treeset order by commitSeq
allBinlogs =
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@@ -74,14 +76,16 @@ public class DBBinlog {
allBinlogs.add(dummy);
}
- public static DBBinlog recoverDbBinlog(TBinlog dbDummy, List<TBinlog>
tableDummies, boolean dbBinlogEnable) {
- DBBinlog dbBinlog = new DBBinlog(dbDummy);
+ public static DBBinlog recoverDbBinlog(BinlogConfigCache
binlogConfigCache, TBinlog dbDummy,
+ List<TBinlog> tableDummies, boolean
dbBinlogEnable) {
+ DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
+ long dbId = dbDummy.getDbId();
for (TBinlog tableDummy : tableDummies) {
long tableId = tableDummy.getBelong();
- if (!dbBinlogEnable &&
!BinlogUtils.tableEnabledBinlog(dbBinlog.getDbId(), tableId)) {
+ if (!dbBinlogEnable && !binlogConfigCache.isEnableTable(dbId,
tableId)) {
continue;
}
- dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(tableDummy,
tableId));
+ dbBinlog.tableBinlogMap.put(tableId, new
TableBinlog(binlogConfigCache, tableDummy, dbId, tableId));
dbBinlog.tableDummyBinlogs.add(tableDummy);
}
@@ -111,11 +115,12 @@ public class DBBinlog {
}
}
+ // TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
recovery
private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean
dbBinlogEnable) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
- if (dbBinlogEnable || BinlogUtils.tableEnabledBinlog(dbId,
tableId)) {
- tableBinlog = new TableBinlog(binlog, tableId);
+ if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId,
tableId)) {
+ tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId,
tableId);
tableBinlogMap.put(tableId, tableBinlog);
tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
}
@@ -123,21 +128,25 @@ public class DBBinlog {
return tableBinlog;
}
- public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) {
+ // guard by BinlogManager, if addBinlog called, more than one(db/tables)
enable binlog
+ public void addBinlog(TBinlog binlog) {
+ boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
List<Long> tableIds = binlog.getTableIds();
+
lock.writeLock().lock();
try {
+ allBinlogs.add(binlog);
+
if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
timestamps.add(Pair.of(binlog.getCommitSeq(),
binlog.getTimestamp()));
}
- allBinlogs.add(binlog);
-
if (tableIds == null) {
return;
}
// HACK: for metadata fix
+ // we should not add binlog for create table and drop table in
table binlog
if (!binlog.isSetType()) {
return;
}
@@ -205,22 +214,22 @@ public class DBBinlog {
public BinlogTombstone gc() {
// check db
- Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
- if (db == null) {
+ BinlogConfig dbBinlogConfig =
binlogConfigCache.getDBBinlogConfig(dbId);
+ if (dbBinlogConfig == null) {
LOG.error("db not found. dbId: {}", dbId);
return null;
}
- boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+ boolean dbBinlogEnable = dbBinlogConfig.isEnable();
BinlogTombstone tombstone;
if (dbBinlogEnable) {
// db binlog is enabled, only one binlogTombstones
- long ttlSeconds = db.getBinlogConfig().getTtlSeconds();
+ long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
tombstone = dbBinlogEnableGc(expiredMs);
} else {
- tombstone = dbBinlogDisableGc(db);
+ tombstone = dbBinlogDisableGc();
}
return tombstone;
@@ -250,7 +259,7 @@ public class DBBinlog {
return dbTombstone;
}
- private BinlogTombstone dbBinlogDisableGc(Database db) {
+ private BinlogTombstone dbBinlogDisableGc() {
List<BinlogTombstone> tombstones = Lists.newArrayList();
List<TableBinlog> tableBinlogs;
@@ -262,7 +271,7 @@ public class DBBinlog {
}
for (TableBinlog tableBinlog : tableBinlogs) {
- BinlogTombstone tombstone = tableBinlog.gc(db);
+ BinlogTombstone tombstone = tableBinlog.ttlGc();
if (tombstone != null) {
tombstones.add(tombstone);
}
@@ -348,7 +357,7 @@ public class DBBinlog {
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
// step 2.1: gc tableBinlogļ¼and get table tombstone
- BinlogTombstone tableTombstone = tableBinlog.gc(expiredCommitSeq);
+ BinlogTombstone tableTombstone =
tableBinlog.commitSeqGc(expiredCommitSeq);
if (tableTombstone != null) {
tableTombstones.add(tableTombstone);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 8934084e99..0857ae7abb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -17,9 +17,7 @@
package org.apache.doris.binlog;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
@@ -37,11 +35,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TableBinlog {
private static final Logger LOG = LogManager.getLogger(TableBinlog.class);
+ private long dbId;
private long tableId;
private ReentrantReadWriteLock lock;
private TreeSet<TBinlog> binlogs;
+ private BinlogConfigCache binlogConfigCache;
- public TableBinlog(TBinlog binlog, long tableId) {
+ public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog,
long dbId, long tableId) {
+ this.dbId = dbId;
this.tableId = tableId;
lock = new ReentrantReadWriteLock();
binlogs =
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@@ -53,6 +54,7 @@ public class TableBinlog {
dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId);
}
binlogs.add(dummy);
+ this.binlogConfigCache = binlogConfigCache;
}
public TBinlog getDummyBinlog() {
@@ -100,7 +102,7 @@ public class TableBinlog {
}
}
- private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired,
BinlogComparator check) {
+ private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired,
BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
}
@@ -111,7 +113,7 @@ public class TableBinlog {
TBinlog lastExpiredBinlog = null;
while (iter.hasNext()) {
TBinlog binlog = iter.next();
- if (check.isExpired(binlog, expired)) {
+ if (checker.isExpired(binlog, expired)) {
lastExpiredBinlog = binlog;
--binlog.table_ref;
if (binlog.getType() == TBinlogType.UPSERT) {
@@ -133,7 +135,7 @@ public class TableBinlog {
}
// this method call when db binlog enable
- public BinlogTombstone gc(long expiredCommitSeq) {
+ public BinlogTombstone commitSeqGc(long expiredCommitSeq) {
Pair<TBinlog, Long> tombstoneInfo;
// step 1: get tombstoneUpsertBinlog and dummyBinlog
@@ -163,31 +165,20 @@ public class TableBinlog {
}
// this method call when db binlog disable
- public BinlogTombstone gc(Database db) {
+ public BinlogTombstone ttlGc() {
// step 1: get expire time
- OlapTable table;
- try {
- Table tbl = db.getTableOrMetaException(tableId);
- if (tbl == null) {
- LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
- return null;
- }
- if (!(tbl instanceof OlapTable)) {
- LOG.warn("table is not olap table. db: {}, table id: {}",
db.getFullName(), tableId);
- return null;
- }
- table = (OlapTable) tbl;
- } catch (Exception e) {
- LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
+ BinlogConfig tableBinlogConfig =
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
+ if (tableBinlogConfig == null) {
return null;
}
- long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
+ long ttlSeconds = tableBinlogConfig.getTtlSeconds();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
if (expiredMs < 0) {
return null;
}
+ LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId,
tableId, expiredMs);
// step 2: get tombstoneUpsertBinlog and dummyBinlog
Pair<TBinlog, Long> tombstoneInfo;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 967e129728..e956cf9f0a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4400,8 +4400,9 @@ public class Env {
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(),
table, false);
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(table.getId(),
DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
- ModifyTablePropertyOperationLog info = new
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
- logProperties);
+ ModifyTablePropertyOperationLog info =
+ new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
table.getName(),
+ logProperties);
editLog.logDynamicPartition(info);
}
@@ -4473,9 +4474,9 @@ public class Env {
public void modifyTableDefaultReplicaAllocation(Database db, OlapTable
table, Map<String, String> properties) {
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
table.setReplicaAllocation(properties);
- // log
- ModifyTablePropertyOperationLog info = new
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
- properties);
+ ModifyTablePropertyOperationLog info =
+ new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
table.getName(),
+ properties);
editLog.logModifyReplicationNum(info);
LOG.debug("modify table[{}] replication num to {}", table.getName(),
properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
@@ -4500,8 +4501,9 @@ public class Env {
table.getPartitionInfo().setStoragePolicy(partition.getId(),
tableProperty.getStoragePolicy());
}
- ModifyTablePropertyOperationLog info = new
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
- properties);
+ ModifyTablePropertyOperationLog info =
+ new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
table.getName(),
+ properties);
editLog.logModifyInMemory(info);
}
@@ -4510,8 +4512,9 @@ public class Env {
table.setBinlogConfig(newBinlogConfig);
- ModifyTablePropertyOperationLog info = new
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
- newBinlogConfig.toProperties());
+ ModifyTablePropertyOperationLog info =
+ new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
table.getName(),
+ newBinlogConfig.toProperties());
editLog.logUpdateBinlogConfig(info);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index c030940a80..5c095636df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -759,6 +759,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
public void alterDatabaseProperty(AlterDatabasePropertyStmt stmt) throws
DdlException {
String dbName = stmt.getDbName();
Database db = (Database) getDbOrDdlException(dbName);
+ long dbId = db.getId();
Map<String, String> properties = stmt.getProperties();
db.writeLockOrDdlException();
@@ -768,7 +769,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
return;
}
- AlterDatabasePropertyInfo info = new
AlterDatabasePropertyInfo(dbName, properties);
+ AlterDatabasePropertyInfo info = new
AlterDatabasePropertyInfo(dbId, dbName, properties);
Env.getCurrentEnv().getEditLog().logAlterDatabaseProperty(info);
} finally {
db.writeUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
index 56858c86a6..5f1c2bec5b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
@@ -29,6 +29,9 @@ import java.io.IOException;
import java.util.Map;
public class AlterDatabasePropertyInfo implements Writable {
+ @SerializedName(value = "dbId")
+ private long dbId;
+
@SerializedName(value = "dbName")
private String dbName;
@@ -41,11 +44,16 @@ public class AlterDatabasePropertyInfo implements Writable {
this.properties = null;
}
- public AlterDatabasePropertyInfo(String dbName, Map<String, String>
properties) {
+ public AlterDatabasePropertyInfo(long dbId, String dbName, Map<String,
String> properties) {
+ this.dbId = dbId;
this.dbName = dbName;
this.properties = properties;
}
+ public long getDbId() {
+ return dbId;
+ }
+
public String getDbName() {
return dbName;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index eda39bc4f6..9b5a54323e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -750,6 +750,7 @@ public class EditLog {
case OperationType.OP_MODIFY_REPLICATION_NUM: {
ModifyTablePropertyOperationLog log =
(ModifyTablePropertyOperationLog) journal.getData();
env.replayModifyTableProperty(opCode, log);
+ env.getBinlogManager().addModifyTableProperty(log, logId);
break;
}
case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: {
@@ -1039,6 +1040,7 @@ public class EditLog {
LOG.info("replay alter database property: {}",
alterDatabasePropertyInfo);
env.replayAlterDatabaseProperty(alterDatabasePropertyInfo.getDbName(),
alterDatabasePropertyInfo.getProperties());
+
env.getBinlogManager().addAlterDatabaseProperty(alterDatabasePropertyInfo,
logId);
break;
}
case OperationType.OP_GC_BINLOG: {
@@ -1623,24 +1625,30 @@ public class EditLog {
logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo);
}
+ private long logModifyTableProperty(short op,
ModifyTablePropertyOperationLog info) {
+ long logId = logEdit(op, info);
+ Env.getCurrentEnv().getBinlogManager().addModifyTableProperty(info,
logId);
+ return logId;
+ }
+
public void logDynamicPartition(ModifyTablePropertyOperationLog info) {
- logEdit(OperationType.OP_DYNAMIC_PARTITION, info);
+ logModifyTableProperty(OperationType.OP_DYNAMIC_PARTITION, info);
}
- public void logModifyReplicationNum(ModifyTablePropertyOperationLog info) {
- logEdit(OperationType.OP_MODIFY_REPLICATION_NUM, info);
+ public long logModifyReplicationNum(ModifyTablePropertyOperationLog info) {
+ return logModifyTableProperty(OperationType.OP_MODIFY_REPLICATION_NUM,
info);
}
public void
logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog
info) {
logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info);
}
- public void logModifyInMemory(ModifyTablePropertyOperationLog info) {
- logEdit(OperationType.OP_MODIFY_IN_MEMORY, info);
+ public long logModifyInMemory(ModifyTablePropertyOperationLog info) {
+ return logModifyTableProperty(OperationType.OP_MODIFY_IN_MEMORY, info);
}
- public void logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
- logEdit(OperationType.OP_UPDATE_BINLOG_CONFIG, info);
+ public long logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
+ return logModifyTableProperty(OperationType.OP_UPDATE_BINLOG_CONFIG,
info);
}
public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) {
@@ -1829,7 +1837,9 @@ public class EditLog {
}
public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
- return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+ long logId = logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+ Env.getCurrentEnv().getBinlogManager().addAlterDatabaseProperty(log,
logId);
+ return logId;
}
public long logGcBinlog(BinlogGcInfo log) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
index f5a0a5d59d..a782db9f9c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
@@ -35,13 +35,27 @@ public class ModifyTablePropertyOperationLog implements
Writable {
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
+ @SerializedName(value = "tableName")
+ private String tableName;
@SerializedName(value = "properties")
private Map<String, String> properties = new HashMap<>();
+ @SerializedName(value = "sql")
+ private String sql;
- public ModifyTablePropertyOperationLog(long dbId, long tableId,
Map<String, String> properties) {
+ public ModifyTablePropertyOperationLog(long dbId, long tableId, String
tableName, Map<String, String> properties) {
this.dbId = dbId;
this.tableId = tableId;
+ this.tableName = tableName;
this.properties = properties;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("SET (");
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1); // remove last ','
+ sb.append(")");
+ this.sql = sb.toString();
}
public long getDbId() {
@@ -64,4 +78,8 @@ public class ModifyTablePropertyOperationLog implements
Writable {
public static ModifyTablePropertyOperationLog read(DataInput in) throws
IOException {
return GsonUtils.GSON.fromJson(Text.readString(in),
ModifyTablePropertyOperationLog.class);
}
+
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
index 87ae23470a..bff50dcf76 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
@@ -54,7 +54,7 @@ public class ModifyDynamicPartitionInfoTest {
properties.put(DynamicPartitionProperty.END, "3");
properties.put(DynamicPartitionProperty.PREFIX, "p");
properties.put(DynamicPartitionProperty.BUCKETS, "30");
- ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new
ModifyTablePropertyOperationLog(100L, 200L, properties);
+ ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new
ModifyTablePropertyOperationLog(100L, 200L, "test", properties);
modifyDynamicPartitionInfo.write(out);
out.flush();
out.close();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 06c3d96c72..0ea96a15f4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -965,6 +965,8 @@ enum TBinlogType {
ALTER_JOB = 5,
MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 6,
DUMMY = 7,
+ ALTER_DATABASE_PROPERTY = 8,
+ MODIFY_TABLE_PROPERTY = 9,
}
struct TBinlog {
@@ -976,6 +978,7 @@ struct TBinlog {
6: optional string data
7: optional i64 belong // belong == -1 if type is not DUMMY
8: optional i64 table_ref // only use for gc
+ 9: optional bool remove_enable_cache
}
struct TGetBinlogResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]