This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c5e4337da50 [fix](audit) add workload_group to audit log table (#30470)
c5e4337da50 is described below
commit c5e4337da50a668d0a65d1cdaaa20b04fef3fa7c
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Jan 30 17:12:09 2024 +0800
[fix](audit) add workload_group to audit log table (#30470)
1. Missing workload_group column in audit table
2. Extract the definition of internal schema's tables into a new class
3. Fix bug that audit loader has no authorization to load data to
audit_table, introduced from #29790
4. Fix bug that audit_log can not be modified to 3 replica because it is
partitioned table
---
.../org/apache/doris/analysis/CreateTableStmt.java | 2 +-
.../org/apache/doris/catalog/InternalSchema.java | 113 +++++++++++++++++
.../doris/catalog/InternalSchemaInitializer.java | 133 ++++++++-------------
.../doris/plugin/audit/AuditLoaderPlugin.java | 1 +
.../doris/plugin/audit/AuditStreamLoader.java | 8 +-
.../apache/doris/service/FrontendServiceImpl.java | 8 +-
.../org/apache/doris/system/SystemInfoService.java | 13 ++
.../doris/transaction/DatabaseTransactionMgr.java | 4 +-
.../apache/doris/transaction/TransactionState.java | 3 +
.../doris/alter/InternalSchemaAlterTest.java | 74 ++++++++++++
.../doris/statistics/AnalysisTaskExecutorTest.java | 2 +-
.../org/apache/doris/statistics/AnalyzeTest.java | 2 +-
.../apache/doris/utframe/TestWithFeService.java | 4 +
13 files changed, 277 insertions(+), 90 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index b1f84326b44..e60f53f157b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -98,7 +98,7 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("broker");
}
- // if auto bucket auto bucket enable, rewrite distribution bucket num &&
+ // if auto bucket enable, rewrite distribution bucket num &&
// set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
private static Map<String, String>
maybeRewriteByAutoBucket(DistributionDesc distributionDesc,
Map<String, String> properties) throws AnalysisException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
new file mode 100644
index 00000000000..7d348761704
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.common.UserException;
+import org.apache.doris.plugin.audit.AuditLoaderPlugin;
+import org.apache.doris.statistics.StatisticConstants;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InternalSchema {
+
+ // Do not use the original schema directly, because it may be modified by
create table operation.
+ public static final List<ColumnDef> COL_STATS_SCHEMA;
+ public static final List<ColumnDef> HISTO_STATS_SCHEMA;
+ public static final List<ColumnDef> AUDIT_SCHEMA;
+
+ static {
+ // column statistics table
+ COL_STATS_SCHEMA = new ArrayList<>();
+ COL_STATS_SCHEMA.add(new ColumnDef("id",
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
+ COL_STATS_SCHEMA.add(new ColumnDef("catalog_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ COL_STATS_SCHEMA.add(new ColumnDef("db_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ COL_STATS_SCHEMA.add(new ColumnDef("tbl_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ COL_STATS_SCHEMA.add(new ColumnDef("idx_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ COL_STATS_SCHEMA.add(new ColumnDef("col_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ COL_STATS_SCHEMA.add(new ColumnDef("part_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("count",
TypeDef.create(PrimitiveType.BIGINT), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("ndv",
TypeDef.create(PrimitiveType.BIGINT), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("null_count",
TypeDef.create(PrimitiveType.BIGINT), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("min",
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("max",
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("data_size_in_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
+ COL_STATS_SCHEMA.add(new ColumnDef("update_time",
TypeDef.create(PrimitiveType.DATETIME)));
+
+ // histogram_statistics table
+ HISTO_STATS_SCHEMA = new ArrayList<>();
+ HISTO_STATS_SCHEMA.add(new ColumnDef("id",
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("catalog_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("db_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("tbl_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("idx_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("col_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("sample_rate",
TypeDef.create(PrimitiveType.DOUBLE)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("buckets",
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
+ HISTO_STATS_SCHEMA.add(new ColumnDef("update_time",
TypeDef.create(PrimitiveType.DATETIME)));
+
+ // audit table
+ AUDIT_SCHEMA = new ArrayList<>();
+ AUDIT_SCHEMA.add(new ColumnDef("query_id", TypeDef.createVarchar(48),
true));
+ AUDIT_SCHEMA.add(new ColumnDef("time",
TypeDef.create(PrimitiveType.DATETIME), true));
+ AUDIT_SCHEMA.add(new ColumnDef("client_ip",
TypeDef.createVarchar(128), true));
+ AUDIT_SCHEMA.add(new ColumnDef("user", TypeDef.createVarchar(128),
true));
+ AUDIT_SCHEMA.add(new ColumnDef("catalog", TypeDef.createVarchar(128),
true));
+ AUDIT_SCHEMA.add(new ColumnDef("db", TypeDef.createVarchar(128),
true));
+ AUDIT_SCHEMA.add(new ColumnDef("state", TypeDef.createVarchar(128),
true));
+ AUDIT_SCHEMA.add(new ColumnDef("error_code",
TypeDef.create(PrimitiveType.INT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("error_message",
TypeDef.create(PrimitiveType.STRING), true));
+ AUDIT_SCHEMA.add(new ColumnDef("query_time",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("scan_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("scan_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("return_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("stmt_id",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("is_query",
TypeDef.create(PrimitiveType.TINYINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("frontend_ip",
TypeDef.createVarchar(128), true));
+ AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128),
true));
+ AUDIT_SCHEMA.add(new ColumnDef("sql_digest",
TypeDef.createVarchar(128), true));
+ AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("workload_group",
TypeDef.create(PrimitiveType.STRING), true));
+ AUDIT_SCHEMA.add(new ColumnDef("stmt",
TypeDef.create(PrimitiveType.STRING), true));
+ }
+
+ // Get copied schema for statistic table
+ // Do not use the original schema directly, because it may be modified by
create table operation.
+ public static List<ColumnDef> getCopiedSchema(String tblName) throws
UserException {
+ List<ColumnDef> schema;
+ if (tblName.equals(StatisticConstants.STATISTIC_TBL_NAME)) {
+ schema = COL_STATS_SCHEMA;
+ } else if (tblName.equals(StatisticConstants.HISTOGRAM_TBL_NAME)) {
+ schema = HISTO_STATS_SCHEMA;
+ } else if (tblName.equals(AuditLoaderPlugin.AUDIT_LOG_TABLE)) {
+ schema = AUDIT_SCHEMA;
+ } else {
+ throw new UserException("Unknown internal table name: " + tblName);
+ }
+ List<ColumnDef> copiedSchema = Lists.newArrayList();
+ for (ColumnDef columnDef : schema) {
+ copiedSchema.add(new ColumnDef(columnDef.getName(),
columnDef.getTypeDef(), columnDef.isAllowNull()));
+ }
+ return copiedSchema;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index d53520b133c..169e2fac80b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -17,17 +17,18 @@
package org.apache.doris.catalog;
-import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.AlterClause;
+import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.ModifyPartitionClause;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TypeDef;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
@@ -56,33 +57,6 @@ public class InternalSchemaInitializer extends Thread {
private static final Logger LOG =
LogManager.getLogger(InternalSchemaInitializer.class);
- public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;
-
- static {
- AUDIT_TABLE_COLUMNS = new ArrayList<>();
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id",
TypeDef.createVarchar(48), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("time",
TypeDef.create(PrimitiveType.DATETIME), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("user",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("db",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("state",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code",
TypeDef.create(PrimitiveType.INT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message",
TypeDef.create(PrimitiveType.STRING), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query",
TypeDef.create(PrimitiveType.TINYINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest",
TypeDef.createVarchar(128), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
- AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt",
TypeDef.create(PrimitiveType.STRING), true));
- }
-
public void run() {
if (!FeConstants.enableInternalSchemaDb) {
return;
@@ -97,7 +71,7 @@ public class InternalSchemaInitializer extends Thread {
}
Thread.currentThread()
.join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS *
1000L);
- createDB();
+ createDb();
createTbl();
} catch (Throwable e) {
LOG.warn("Statistics storage initiated failed, will try again
later", e);
@@ -116,29 +90,51 @@ public class InternalSchemaInitializer extends Thread {
modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
}
- public void modifyTblReplicaCount(Database database, String tblName) {
+ @VisibleForTesting
+ public static void modifyTblReplicaCount(Database database, String
tblName) {
if (!(Config.min_replication_num_per_tablet <
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
&& Config.max_replication_num_per_tablet >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) {
return;
}
while (true) {
- if (Env.getCurrentSystemInfo().aliveBECount() >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+ int backendNum =
Env.getCurrentSystemInfo().getBackendNumFromDiffHosts(true);
+ if (FeConstants.runningUnitTest) {
+ backendNum =
Env.getCurrentSystemInfo().getAllBackendIds().size();
+ }
+ if (backendNum >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
try {
- Map<String, String> props = new HashMap<>();
-
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
"tag.location.default: "
- +
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
- TableIf colStatsTbl =
StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
+ OlapTable tbl = (OlapTable)
StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
StatisticConstants.DB_NAME, tblName);
- OlapTable olapTable = (OlapTable) colStatsTbl;
- if
(olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum()
- >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
- return;
- }
- colStatsTbl.writeLock();
+ tbl.writeLock();
try {
-
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable)
colStatsTbl, props);
+ if
(tbl.getTableProperty().getReplicaAllocation().getTotalReplicaNum()
+ >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+ return;
+ }
+ if (!tbl.isPartitionedTable()) {
+ Map<String, String> props = new HashMap<>();
+
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
"tag.location.default: "
+ +
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+
Env.getCurrentEnv().modifyTableReplicaAllocation(database, tbl, props);
+ } else {
+ TableName tableName = new
TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
+ StatisticConstants.DB_NAME, tbl.getName());
+ // 1. modify table's default replica num
+ Map<String, String> props = new HashMap<>();
+ props.put("default." +
PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
+ "" +
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+
Env.getCurrentEnv().modifyTableDefaultReplicaAllocation(database, tbl, props);
+ // 2. modify each partition's replica num
+ List<AlterClause> clauses = Lists.newArrayList();
+ props.clear();
+
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
+ "" +
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+
clauses.add(ModifyPartitionClause.createStarClause(props, false));
+ AlterTableStmt alter = new
AlterTableStmt(tableName, clauses);
+ Env.getCurrentEnv().alterTable(alter);
+ }
} finally {
- colStatsTbl.writeUnlock();
+ tbl.writeUnlock();
}
break;
} catch (Throwable t) {
@@ -153,7 +149,8 @@ public class InternalSchemaInitializer extends Thread {
}
}
- private void createTbl() throws UserException {
+ @VisibleForTesting
+ public static void createTbl() throws UserException {
// statistics
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
@@ -162,7 +159,7 @@ public class InternalSchemaInitializer extends Thread {
}
@VisibleForTesting
- public static void createDB() {
+ public static void createDb() {
CreateDbStmt createDbStmt = new CreateDbStmt(true,
FeConstants.INTERNAL_DB_NAME,
null);
try {
@@ -173,27 +170,9 @@ public class InternalSchemaInitializer extends Thread {
}
}
- @VisibleForTesting
- public CreateTableStmt buildStatisticsTblStmt() throws UserException {
+ private static CreateTableStmt buildStatisticsTblStmt() throws
UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME,
StatisticConstants.STATISTIC_TBL_NAME);
- List<ColumnDef> columnDefs = new ArrayList<>();
- columnDefs.add(new ColumnDef("id",
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
- columnDefs.add(new ColumnDef("catalog_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("db_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("tbl_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("idx_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("col_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- ColumnDef partId = new ColumnDef("part_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN));
- partId.setAllowNull(true);
- columnDefs.add(partId);
- columnDefs.add(new ColumnDef("count",
TypeDef.create(PrimitiveType.BIGINT), true));
- columnDefs.add(new ColumnDef("ndv",
TypeDef.create(PrimitiveType.BIGINT), true));
- columnDefs.add(new ColumnDef("null_count",
TypeDef.create(PrimitiveType.BIGINT), true));
- columnDefs.add(new ColumnDef("min",
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
- columnDefs.add(new ColumnDef("max",
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
- columnDefs.add(new ColumnDef("data_size_in_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
- columnDefs.add(new ColumnDef("update_time",
TypeDef.create(PrimitiveType.DATETIME)));
String engineName = "olap";
ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id",
"db_id", "tbl_id", "idx_id", "col_id", "part_id");
@@ -207,26 +186,16 @@ public class InternalSchemaInitializer extends Thread {
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
- tableName, columnDefs, engineName, keysDesc, null,
distributionDesc,
+ tableName,
InternalSchema.getCopiedSchema(StatisticConstants.STATISTIC_TBL_NAME),
+ engineName, keysDesc, null, distributionDesc,
properties, null, "Doris internal statistics table, DO NOT
MODIFY IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;
}
- @VisibleForTesting
- public CreateTableStmt buildHistogramTblStmt() throws UserException {
+ private static CreateTableStmt buildHistogramTblStmt() throws
UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME,
StatisticConstants.HISTOGRAM_TBL_NAME);
- List<ColumnDef> columnDefs = new ArrayList<>();
- columnDefs.add(new ColumnDef("id",
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
- columnDefs.add(new ColumnDef("catalog_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("db_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("tbl_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("idx_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("col_id",
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
- columnDefs.add(new ColumnDef("sample_rate",
TypeDef.create(PrimitiveType.DOUBLE)));
- columnDefs.add(new ColumnDef("buckets",
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
- columnDefs.add(new ColumnDef("update_time",
TypeDef.create(PrimitiveType.DATETIME)));
String engineName = "olap";
ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id",
"db_id", "tbl_id", "idx_id", "col_id");
@@ -240,13 +209,14 @@ public class InternalSchemaInitializer extends Thread {
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
- tableName, columnDefs, engineName, keysDesc, null,
distributionDesc,
+ tableName,
InternalSchema.getCopiedSchema(StatisticConstants.HISTOGRAM_TBL_NAME),
+ engineName, keysDesc, null, distributionDesc,
properties, null, "Doris internal statistics table, DO NOT
MODIFY IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;
}
- private CreateTableStmt buildAuditTblStmt() throws UserException {
+ private static CreateTableStmt buildAuditTblStmt() throws UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME,
AuditLoaderPlugin.AUDIT_LOG_TABLE);
@@ -271,7 +241,8 @@ public class InternalSchemaInitializer extends Thread {
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
- tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc,
partitionDesc, distributionDesc,
+ tableName,
InternalSchema.getCopiedSchema(AuditLoaderPlugin.AUDIT_LOG_TABLE),
+ engineName, keysDesc, partitionDesc, distributionDesc,
properties, null, "Doris internal audit table, DO NOT MODIFY
IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 533f50f062d..148fc460d5a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -161,6 +161,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
logBuffer.append(event.sqlHash).append("\t");
logBuffer.append(event.sqlDigest).append("\t");
logBuffer.append(event.peakMemoryBytes).append("\t");
+ logBuffer.append(event.workloadGroup).append("\t");
// trim the query to avoid too long
// use `getBytes().length` to get real byte length
String stmt = truncateByBytes(event.stmt).replace("\n", " ")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
index 07175478275..72b046f015a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -17,7 +17,7 @@
package org.apache.doris.plugin.audit;
-import org.apache.doris.catalog.InternalSchemaInitializer;
+import org.apache.doris.catalog.InternalSchema;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@@ -48,7 +48,7 @@ public class AuditStreamLoader {
this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE;
this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db,
auditLogTbl);
// currently, FE identity is FE's IP, so we replace the "." in IP to
make it suitable for label
- this.feIdentity = hostPort.replaceAll("\\.", "_");
+ this.feIdentity = hostPort.replaceAll("\\.", "_").replaceAll(":", "_");
}
private HttpURLConnection getConnection(String urlStr, String label,
String clusterToken) throws IOException {
@@ -63,7 +63,7 @@ public class AuditStreamLoader {
conn.addRequestProperty("label", label);
conn.addRequestProperty("max_filter_ratio", "1.0");
conn.addRequestProperty("columns",
- InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c
-> c.getName()).collect(
+ InternalSchema.AUDIT_SCHEMA.stream().map(c ->
c.getName()).collect(
Collectors.joining(",")));
conn.setDoOutput(true);
conn.setDoInput(true);
@@ -78,7 +78,7 @@ public class AuditStreamLoader {
sb.append("-H \"").append("Content-Type\":").append("\"text/plain;
charset=UTF-8\" \\\n ");
sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n
");
sb.append("-H \"").append("columns\":")
- .append("\"" +
InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c ->
c.getName()).collect(
+ .append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c ->
c.getName()).collect(
Collectors.joining(",")) + "\" \\\n ");
sb.append("\"").append(conn.getURL()).append("\"");
return sb.toString();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 18623c33f7a..51067f2b49e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1091,6 +1091,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
+ } else {
+ checkToken(request.getToken());
}
// check label
@@ -1112,9 +1114,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl,
TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
+ TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE,
clientIp);
+ if (request.isSetToken()) {
+ txnCoord.isFromInternal = true;
+ }
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()),
request.getLabel(), request.getRequestId(),
- new TxnCoordinator(TxnSourceType.BE, clientIp),
+ txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1,
timeoutSecond);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
result.setTxnId(txnId).setDbId(db.getId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index d4140c2b01f..f45086f239d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -565,6 +565,19 @@ public class SystemInfoService {
return idToBackendRef.values().stream().filter(backend ->
backend.isComputeNode()).collect(Collectors.toList());
}
+ // return num of backends that from different hosts
+ public int getBackendNumFromDiffHosts(boolean aliveOnly) {
+ Set<String> hosts = Sets.newHashSet();
+ ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ for (Backend backend : idToBackend.values()) {
+ if (aliveOnly && !backend.isAlive()) {
+ continue;
+ }
+ hosts.add(backend.getHost());
+ }
+ return hosts.size();
+ }
+
class BeIdComparator implements Comparator<Backend> {
public int compare(Backend a, Backend b) {
return (int) (a.getId() - b.getId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9d7ce073880..e772b28ade4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -343,7 +343,9 @@ public class DatabaseTransactionMgr {
throws DuplicatedRequestException, LabelAlreadyUsedException,
BeginTransactionException,
AnalysisException, QuotaExceedException, MetaNotFoundException {
Database db = env.getInternalCatalog().getDbOrMetaException(dbId);
- InternalDatabaseUtil.checkDatabase(db.getFullName(),
ConnectContext.get());
+ if (!coordinator.isFromInternal) {
+ InternalDatabaseUtil.checkDatabase(db.getFullName(),
ConnectContext.get());
+ }
checkDatabaseDataQuota();
Preconditions.checkNotNull(coordinator);
Preconditions.checkNotNull(label);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 45c40f964b0..7f12ab99921 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -164,6 +164,9 @@ public class TransactionState implements Writable {
public TxnSourceType sourceType;
@SerializedName(value = "ip")
public String ip;
+ // True if this txn if created by system(such as writing data to audit
table)
+ @SerializedName(value = "ii")
+ public boolean isFromInternal = false;
public TxnCoordinator() {
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
new file mode 100644
index 00000000000..2898e3a1c35
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.alter;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.InternalSchemaInitializer;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.plugin.audit.AuditLoaderPlugin;
+import org.apache.doris.statistics.StatisticConstants;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class InternalSchemaAlterTest extends TestWithFeService {
+
+ @Override
+ protected int backendNum() {
+ return 3;
+ }
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ InternalSchemaInitializer.createDb();
+ InternalSchemaInitializer.createTbl();
+ Config.allow_replica_on_same_host = true;
+ FeConstants.runningUnitTest = true;
+ }
+
+ @Test
+ public void testModifyTblReplicaCount() throws AnalysisException {
+ Database db = Env.getCurrentEnv().getCatalogMgr()
+
.getInternalCatalog().getDbNullable(FeConstants.INTERNAL_DB_NAME);
+ InternalSchemaInitializer.modifyTblReplicaCount(db,
StatisticConstants.STATISTIC_TBL_NAME);
+ InternalSchemaInitializer.modifyTblReplicaCount(db,
StatisticConstants.HISTOGRAM_TBL_NAME);
+ InternalSchemaInitializer.modifyTblReplicaCount(db,
AuditLoaderPlugin.AUDIT_LOG_TABLE);
+
+ checkReplicationNum(db, StatisticConstants.STATISTIC_TBL_NAME);
+ checkReplicationNum(db, StatisticConstants.HISTOGRAM_TBL_NAME);
+ checkReplicationNum(db, AuditLoaderPlugin.AUDIT_LOG_TABLE);
+ }
+
+ private void checkReplicationNum(Database db, String tblName) throws
AnalysisException {
+ OlapTable olapTable = db.getOlapTableOrAnalysisException(tblName);
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ Assertions.assertEquals((short) 3,
olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum(),
+ tblName);
+ for (Partition partition : olapTable.getPartitions()) {
+ Assertions.assertEquals((short) 3,
+
partitionInfo.getReplicaAllocation(partition.getId()).getTotalReplicaNum());
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 1e187d74671..b17ba3e68db 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -54,7 +54,7 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
@Override
protected void runBeforeAll() throws Exception {
try {
- InternalSchemaInitializer.createDB();
+ InternalSchemaInitializer.createDb();
createDatabase("analysis_job_test");
connectContext.setDatabase("analysis_job_test");
createTable("CREATE TABLE t1 (col1 int not null, col2 int not
null, col3 int not null)\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
index 85fa42071e9..483cd3c0326 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
@@ -55,7 +55,7 @@ public class AnalyzeTest extends TestWithFeService {
@Override
protected void runBeforeAll() throws Exception {
try {
- InternalSchemaInitializer.createDB();
+ InternalSchemaInitializer.createDb();
createDatabase("analysis_job_test");
connectContext.setDatabase("analysis_job_test");
createTable("CREATE TABLE t1 (col1 int not null, col2 int not
null, col3 int not null)\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 1f57040d305..65070dd504b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -188,6 +188,10 @@ public abstract class TestWithFeService {
return 1;
}
+ protected boolean needDiffHost() {
+ return false;
+ }
+
// Help to create a mocked ConnectContext.
public static ConnectContext createDefaultCtx() throws IOException {
return createCtx(UserIdentity.ROOT, "127.0.0.1");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]