This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bb334de00f [enhancement](load) Change transaction limit from global
level to db level (#15830)
bb334de00f is described below
commit bb334de00ff79cfed2834401fc5b3e2000289605
Author: Henry2SS <[email protected]>
AuthorDate: Wed Feb 8 18:04:26 2023 +0800
[enhancement](load) Change transaction limit from global level to db level
(#15830)
Add transaction size quota for database
Co-authored-by: wuhangze <[email protected]>
---
docs/en/docs/admin-manual/config/fe-config.md | 22 +++++++++++++++
docs/zh-CN/docs/admin-manual/config/fe-config.md | 22 +++++++++++++++
.../main/java/org/apache/doris/common/Config.java | 6 +++++
fe/fe-core/src/main/cup/sql_parser.cup | 4 +++
.../doris/analysis/AlterDatabaseQuotaStmt.java | 7 +++--
.../org/apache/doris/analysis/ShowDataStmt.java | 6 +++++
.../java/org/apache/doris/catalog/Database.java | 31 ++++++++++++++++++++++
.../org/apache/doris/common/proc/DbsProcDir.java | 3 +++
.../org/apache/doris/common/util/ParseUtil.java | 13 +++++++++
.../apache/doris/datasource/InternalCatalog.java | 4 +++
.../doris/load/sync/canal/CanalSyncChannel.java | 5 ++--
.../doris/transaction/DatabaseTransactionMgr.java | 9 +++----
.../doris/analysis/AlterDatabaseQuotaStmtTest.java | 23 ++++++++++++++++
.../apache/doris/common/proc/DbsProcDirTest.java | 8 +++---
14 files changed, 150 insertions(+), 13 deletions(-)
diff --git a/docs/en/docs/admin-manual/config/fe-config.md
b/docs/en/docs/admin-manual/config/fe-config.md
index c23db57284..8586269220 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2593,3 +2593,25 @@ MasterOnly:true
Maximum number of error tablet showed in broker load.
+#### `default_db_max_running_txn_num`
+
+Default:-1
+
+IsMutable:true
+
+MasterOnly:true
+
+Used to set the default database transaction quota size.
+
+The default value setting to -1 means using `max_running_txn_num_per_db`
instead of `default_db_max_running_txn_num`.
+
+To set the quota size of a single database, you can use:
+
+```
+Set the database transaction quota
+ALTER DATABASE db_name SET TRANSACTION QUOTA quota;
+View configuration
+show data (Detail:HELP SHOW DATA)
+```
+
+
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 59a5dcd031..19b3a64e89 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2592,3 +2592,25 @@ SmallFileMgr 中存储的最大文件数
是否为 Master FE 节点独有的配置项:true
broker load job 保存的失败tablet 信息的最大数量
+
+#### `default_db_max_running_txn_num`
+
+默认值:-1
+
+是否可以动态配置:true
+
+是否为 Master FE 节点独有的配置项:true
+
+用于设置默认数据库事务配额大小。
+
+默认值设置为 -1 意味着使用 `max_running_txn_num_per_db` 而不是
`default_db_max_running_txn_num`。
+
+设置单个数据库的配额大小可以使用:
+
+```
+设置数据库事务量配额
+ALTER DATABASE db_name SET TRANSACTION QUOTA quota;
+查看配置
+show data (其他用法:HELP SHOW DATA)
+```
+
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5ea73a3ddb..6edbc5a160 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1985,5 +1985,11 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static int pull_request_id = 0;
+
+ /**
+ * Used to set default db transaction quota num.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static long default_db_max_running_txn_num = -1;
}
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 9e873edca4..5f727df186 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1271,6 +1271,10 @@ alter_stmt ::=
{:
RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.REPLICA,
String.valueOf(number));
:}
+ | KW_ALTER KW_DATABASE ident:dbName KW_SET KW_TRANSACTION KW_QUOTA
INTEGER_LITERAL:number
+ {:
+ RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.TRANSACTION,
String.valueOf(number));
+ :}
| KW_ALTER KW_DATABASE ident:dbName KW_RENAME ident:newDbName
{:
RESULT = new AlterDatabaseRename(dbName, newDbName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
index c2cb0fbb0a..90c01016e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
@@ -37,7 +37,8 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
public enum QuotaType {
NONE,
DATA,
- REPLICA
+ REPLICA,
+ TRANSACTION
}
public AlterDatabaseQuotaStmt(String dbName, QuotaType quotaType, String
quotaValue) {
@@ -75,6 +76,8 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
quota = ParseUtil.analyzeDataVolumn(quotaValue);
} else if (quotaType == QuotaType.REPLICA) {
quota = ParseUtil.analyzeReplicaNumber(quotaValue);
+ } else if (quotaType == QuotaType.TRANSACTION) {
+ quota = ParseUtil.analyzeTransactionNumber(quotaValue);
}
}
@@ -82,7 +85,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
@Override
public String toSql() {
return "ALTER DATABASE " + dbName + " SET "
- + (quotaType == QuotaType.DATA ? "DATA" : "REPLICA")
+ + quotaType.name()
+ " QUOTA " + quotaValue;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java
index ce19a53135..53b2a1fc1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java
@@ -212,6 +212,12 @@ public class ShowDataStmt extends ShowStmt {
+ leftPair.second;
List<String> leftRow = Arrays.asList("Left", readableLeft,
String.valueOf(replicaCountLeft));
totalRows.add(leftRow);
+
+ // txn quota
+ long txnQuota = db.getTransactionQuotaSize();
+ List<String> transactionQuotaList = Arrays.asList("Transaction
Quota",
+ String.valueOf(txnQuota), String.valueOf(txnQuota));
+ totalRows.add(transactionQuotaList);
} finally {
db.readUnlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index cf7ac4e4ad..1fc88c34ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -71,6 +71,8 @@ import java.util.stream.Collectors;
public class Database extends MetaObject implements Writable,
DatabaseIf<Table> {
private static final Logger LOG = LogManager.getLogger(Database.class);
+ private static final String TRANSACTION_QUOTA_SIZE =
"transactionQuotaSize";
+
private long id;
private volatile String fullQualifiedName;
private String clusterName;
@@ -91,6 +93,8 @@ public class Database extends MetaObject implements Writable,
DatabaseIf<Table>
private volatile long replicaQuotaSize;
+ private volatile long transactionQuotaSize;
+
private volatile boolean isDropped;
public enum DbState {
@@ -118,6 +122,9 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>
this.lowerCaseToTableName = Maps.newConcurrentMap();
this.dataQuotaBytes = Config.default_db_data_quota_bytes;
this.replicaQuotaSize = Config.default_db_replica_quota_size;
+ this.transactionQuotaSize = Config.default_db_max_running_txn_num ==
-1L
+ ? Config.max_running_txn_num_per_db
+ : Config.default_db_max_running_txn_num;
this.dbState = DbState.NORMAL;
this.attachDbName = "";
this.clusterName = "";
@@ -213,6 +220,19 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>
this.replicaQuotaSize = newQuota;
}
+ public void setTransactionQuotaSize(long newQuota) {
+ writeLock();
+ try {
+ Preconditions.checkArgument(newQuota >= 0L);
+ LOG.info("database[{}] try to set transaction quota from {} to {}",
+ fullQualifiedName, transactionQuotaSize, newQuota);
+ this.transactionQuotaSize = newQuota;
+ this.dbProperties.put(TRANSACTION_QUOTA_SIZE,
String.valueOf(transactionQuotaSize));
+ } finally {
+ writeUnlock();
+ }
+ }
+
public long getDataQuota() {
return dataQuotaBytes;
}
@@ -221,6 +241,10 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>
return replicaQuotaSize;
}
+ public long getTransactionQuotaSize() {
+ return transactionQuotaSize;
+ }
+
public DatabaseProperty getDbProperties() {
return dbProperties;
}
@@ -603,6 +627,13 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_105) {
dbProperties = DatabaseProperty.read(in);
+ String txnQuotaStr =
dbProperties.getOrDefault(TRANSACTION_QUOTA_SIZE,
+ String.valueOf(Config.max_running_txn_num_per_db));
+ transactionQuotaSize = Long.parseLong(txnQuotaStr);
+ } else {
+ transactionQuotaSize = Config.default_db_max_running_txn_num == -1L
+ ? Config.max_running_txn_num_per_db
+ : Config.default_db_max_running_txn_num;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java
index 47029c9e92..431c618a6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java
@@ -43,6 +43,7 @@ public class DbsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("DbId").add("DbName").add("TableNum").add("Size").add("Quota")
.add("LastConsistencyCheckTime").add("ReplicaCount").add("ReplicaQuota")
+ .add("TransactionQuota")
.build();
private Env env;
@@ -114,11 +115,13 @@ public class DbsProcDir implements ProcDirInterface {
((Database) db).getLastCheckTime()) :
FeConstants.null_string;
long replicaCount = (db instanceof Database) ? ((Database)
db).getReplicaCountWithLock() : 0;
long replicaQuota = (db instanceof Database) ? ((Database)
db).getReplicaQuota() : 0;
+ long transactionQuota = (db instanceof Database) ? ((Database)
db).getTransactionQuotaSize() : 0;
dbInfo.add(readableUsedQuota);
dbInfo.add(readableQuota);
dbInfo.add(lastCheckTime);
dbInfo.add(replicaCount);
dbInfo.add(replicaQuota);
+ dbInfo.add(transactionQuota);
} finally {
db.readUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
index ed3c4947fe..c47753d2d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
@@ -83,4 +83,17 @@ public class ParseUtil {
return replicaNumber;
}
+ public static long analyzeTransactionNumber(String transactionNumberStr)
throws AnalysisException {
+ long transactionNumber = 0;
+ try {
+ transactionNumber = Long.parseLong(transactionNumberStr);
+ } catch (NumberFormatException nfe) {
+ throw new AnalysisException("invalid data volumn:" +
transactionNumberStr);
+ }
+ if (transactionNumber <= 0L) {
+ throw new AnalysisException("Transaction quota size must larger
than 0");
+ }
+ return transactionNumber;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 31455d814c..6a9392ffee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -775,6 +775,8 @@ public class InternalCatalog implements CatalogIf<Database>
{
db.setDataQuota(stmt.getQuota());
} else if (quotaType == QuotaType.REPLICA) {
db.setReplicaQuota(stmt.getQuota());
+ } else if (quotaType == QuotaType.TRANSACTION) {
+ db.setTransactionQuotaSize(stmt.getQuota());
}
long quota = stmt.getQuota();
DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota,
quotaType);
@@ -792,6 +794,8 @@ public class InternalCatalog implements CatalogIf<Database>
{
db.setDataQuota(quota);
} else if (quotaType == QuotaType.REPLICA) {
db.setReplicaQuota(quota);
+ } else if (quotaType == QuotaType.TRANSACTION) {
+ db.setTransactionQuotaSize(quota);
}
} finally {
db.writeUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 46e4f8df13..5ad6919551 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -124,7 +124,8 @@ public class CanalSyncChannel extends SyncChannel {
String targetColumn = Joiner.on(",").join(columns) + "," +
DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr =
Env.getCurrentGlobalTransactionMgr();
DatabaseTransactionMgr databaseTransactionMgr =
globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
- if (databaseTransactionMgr.getRunningTxnNums() <
Config.max_running_txn_num_per_db) {
+ long txnLimit = db.getTransactionQuotaSize();
+ if (databaseTransactionMgr.getRunningTxnNums() < txnLimit) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
@@ -185,7 +186,7 @@ public class CanalSyncChannel extends SyncChannel {
} else {
String failMsg = "current running txns on db " + db.getId() +
" is "
+ databaseTransactionMgr.getRunningTxnNums()
- + ", larger than limit " +
Config.max_running_txn_num_per_db;
+ + ", larger than limit " + txnLimit;
LOG.warn(failMsg);
throw new BeginTransactionException(failMsg);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index dec2a4298f..e250602214 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -31,7 +31,6 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -126,7 +125,6 @@ public class DatabaseTransactionMgr {
// it must exists in dbIdToTxnLabels, and vice versa
private final Map<String, Set<Long>> labelToTxnIds = Maps.newHashMap();
-
// count the number of running txns of database, except for the routine
load txn
private volatile int runningTxnNums = 0;
private volatile int runningTxnReplicaNums = 0;
@@ -1523,7 +1521,7 @@ public class DatabaseTransactionMgr {
}
protected void
checkRunningTxnExceedLimit(TransactionState.LoadJobSourceType sourceType)
- throws BeginTransactionException {
+ throws BeginTransactionException, MetaNotFoundException {
switch (sourceType) {
case ROUTINE_LOAD_TASK:
// no need to check limit for routine load task:
@@ -1532,9 +1530,10 @@ public class DatabaseTransactionMgr {
// load, and other txn may not be able to submitted.
break;
default:
- if (runningTxnNums >= Config.max_running_txn_num_per_db) {
+ long txnQuota =
env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize();
+ if (runningTxnNums >= txnQuota) {
throw new BeginTransactionException("current running txns
on db " + dbId + " is "
- + runningTxnNums + ", larger than limit " +
Config.max_running_txn_num_per_db);
+ + runningTxnNums + ", larger than limit " +
txnQuota);
}
break;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java
index 73a7ed6cce..19b2365e25 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java
@@ -154,4 +154,27 @@ public class AlterDatabaseQuotaStmtTest {
Assert.fail("No exception throws.");
}
+ @Test
+ public void testNormalAlterDatabaseTransactionQuotaStmt() throws
AnalysisException, UserException {
+ long quotaSize = 10;
+ AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb",
QuotaType.TRANSACTION, String.valueOf(quotaSize));
+ stmt.analyze(analyzer);
+ String expectedSql = "ALTER DATABASE testCluster:testDb SET
TRANSACTION QUOTA 10";
+ Assert.assertEquals(expectedSql, stmt.toSql());
+ Assert.assertEquals(quotaSize, stmt.getQuota());
+ }
+
+ @Test(expected = AnalysisException.class)
+ public void testTransactionMinusQuota() throws AnalysisException,
UserException {
+ AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb",
QuotaType.TRANSACTION, "-100");
+ stmt.analyze(analyzer);
+ Assert.fail("No exception throws.");
+ }
+
+ @Test(expected = AnalysisException.class)
+ public void testtransactionInvalidQuantity() throws AnalysisException,
UserException {
+ AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb",
QuotaType.TRANSACTION, "invalid_100_quota");
+ stmt.analyze(analyzer);
+ Assert.fail("No exception throws.");
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java
index 40dc802e63..6532d36c33 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java
@@ -193,11 +193,11 @@ public class DbsProcDirTest {
Assert.assertTrue(result instanceof BaseProcResult);
Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum",
"Size", "Quota",
- "LastConsistencyCheckTime", "ReplicaCount",
"ReplicaQuota"),
+ "LastConsistencyCheckTime", "ReplicaCount",
"ReplicaQuota", "TransactionQuota"),
result.getColumnNames());
List<List<String>> rows = Lists.newArrayList();
- rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(),
"0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824"));
- rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(),
"0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824"));
+ rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(),
"0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824",
"100"));
+ rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(),
"0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824",
"100"));
Assert.assertEquals(rows, result.getRows());
}
@@ -228,7 +228,7 @@ public class DbsProcDirTest {
dir = new DbsProcDir(env, catalog);
result = dir.fetchResult();
Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum",
"Size", "Quota",
- "LastConsistencyCheckTime", "ReplicaCount",
"ReplicaQuota"),
+ "LastConsistencyCheckTime", "ReplicaCount",
"ReplicaQuota", "TransactionQuota"),
result.getColumnNames());
List<List<String>> rows = Lists.newArrayList();
Assert.assertEquals(rows, result.getRows());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]