This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a617603be1 [opt](stats) Scale replica of stats table to 3 when it's
possible (#22316)
a617603be1 is described below
commit a617603be11c19a7c903db07f4ffcd9173c98384
Author: AKIRA <[email protected]>
AuthorDate: Thu Jul 27 22:42:07 2023 +0800
[opt](stats) Scale replica of stats table to 3 when it's possible (#22316)
pick #22227 for branch-2.0
---
.../main/java/org/apache/doris/common/Config.java | 13 +++--
.../doris/catalog/InternalSchemaInitializer.java | 55 ++++++++++++++++++++--
.../doris/statistics/StatisticConstants.java | 15 ++----
.../doris/statistics/util/StatisticsUtil.java | 6 +--
.../org/apache/doris/system/SystemInfoService.java | 4 ++
5 files changed, 69 insertions(+), 24 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 970fe7eb5a..7450a1aa8f 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1681,13 +1681,6 @@ public class Config extends ConfigBase {
@ConfField
public static int statistics_simultaneously_running_task_num = 10;
- /**
- * Internal table replica num, once set, user should promise the avaible
BE is greater than this value,
- * otherwise the statistics related internal table creation would be
failed.
- */
- @ConfField
- public static int statistic_internal_table_replica_num = 1;
-
/**
* if table has too many replicas, Fe occur oom when schema change.
* 10W replicas is a reasonable value for testing.
@@ -2047,4 +2040,10 @@ public class Config extends ConfigBase {
+ "and modifying table properties. "
+ "This config is recommended to be used only in the test
environment"})
public static int force_olap_table_replication_num = 0;
+
+ @ConfField
+ public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 *
1024;
+
+ @ConfField
+ public static int statistics_sql_parallel_exec_instance_num = 1;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 74d7a5d9fa..39b2326e30 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -30,6 +30,8 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -75,6 +77,53 @@ public class InternalSchemaInitializer extends Thread {
}
}
LOG.info("Internal schema is initialized");
+ Optional<Database> op
+ =
Env.getCurrentEnv().getInternalCatalog().getDb(StatisticConstants.DB_NAME);
+ if (!op.isPresent()) {
+ LOG.warn("Internal DB got deleted!");
+ return;
+ }
+ Database database = op.get();
+ modifyTblReplicaCount(database, StatisticConstants.ANALYSIS_TBL_NAME);
+ modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
+ modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
+ }
+
+ public void modifyTblReplicaCount(Database database, String tblName) {
+ if (!(Config.min_replication_num_per_tablet <
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
+ && Config.max_replication_num_per_tablet >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) {
+ return;
+ }
+ while (true) {
+ if (Env.getCurrentSystemInfo().aliveBECount() >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+ try {
+ Map<String, String> props = new HashMap<>();
+
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
"tag.location.default: "
+ +
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+ TableIf colStatsTbl =
StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
+ StatisticConstants.DB_NAME, tblName);
+ OlapTable olapTable = (OlapTable) colStatsTbl;
+ Partition partition =
olapTable.getPartition(olapTable.getName());
+ if (partition.getReplicaCount() >=
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+ return;
+ }
+ try {
+ colStatsTbl.writeLock();
+
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable)
colStatsTbl, props);
+ } finally {
+ colStatsTbl.writeUnlock();
+ }
+ break;
+ } catch (Throwable t) {
+ LOG.warn("Failed to scale replica of stats tbl:{} to 3",
tblName, t);
+ }
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException t) {
+ // IGNORE
+ }
+ }
}
private void createTbl() throws UserException {
@@ -122,7 +171,7 @@ public class InternalSchemaInitializer extends Thread {
Map<String, String> properties = new HashMap<String, String>() {
{
put("replication_num", String.valueOf(
- Math.max(Config.statistic_internal_table_replica_num,
Config.min_replication_num_per_tablet)));
+ Math.max(1, Config.min_replication_num_per_tablet)));
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
@@ -162,7 +211,7 @@ public class InternalSchemaInitializer extends Thread {
Map<String, String> properties = new HashMap<String, String>() {
{
put("replication_num", String.valueOf(
- Math.max(Config.statistic_internal_table_replica_num,
Config.min_replication_num_per_tablet)));
+ Math.max(1, Config.min_replication_num_per_tablet)));
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
@@ -195,7 +244,7 @@ public class InternalSchemaInitializer extends Thread {
StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT, uniqueKeys);
Map<String, String> properties = new HashMap<String, String>() {
{
- put("replication_num",
String.valueOf(Math.max(Config.statistic_internal_table_replica_num,
+ put("replication_num", String.valueOf(Math.max(1,
Config.min_replication_num_per_tablet)));
}
};
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index eb9572bb74..32ec2a76e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -32,14 +32,10 @@ public class StatisticConstants {
public static final String HISTOGRAM_TBL_NAME = "histogram_statistics";
- public static final String ANALYSIS_JOB_TABLE = "analysis_jobs";
-
public static final int MAX_NAME_LEN = 64;
public static final int ID_LEN = 4096;
- public static final int STATISTIC_PARALLEL_EXEC_INSTANCE_NUM = 1;
-
public static final int STATISTICS_CACHE_VALID_DURATION_IN_HOURS = 24 * 2;
public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2;
@@ -51,18 +47,11 @@ public class StatisticConstants {
*/
public static final int STATISTIC_TABLE_BUCKET_COUNT = 7;
- public static final long STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES = 2L * 1024
* 1024 * 1024;
-
/**
* Determine the execution interval for 'Statistics Table Cleaner' thread.
*/
public static final int STATISTIC_CLEAN_INTERVAL_IN_HOURS = 24 * 2;
- /**
- * If analysis job execution time exceeds this time, it would be cancelled.
- */
- public static final long STATISTICS_TASKS_TIMEOUT_IN_MS =
TimeUnit.MINUTES.toMillis(10);
-
public static final long PRELOAD_RETRY_TIMES = 5;
public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS =
TimeUnit.SECONDS.toMillis(10);
@@ -84,6 +73,10 @@ public class StatisticConstants {
public static List<String> STATISTICS_DB_BLACK_LIST = new ArrayList<>();
+ public static final String DB_NAME = SystemInfoService.DEFAULT_CLUSTER +
":" + FeConstants.INTERNAL_DB_NAME;
+
+ public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;
+
static {
STATISTICS_DB_BLACK_LIST.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER +
FeConstants.INTERNAL_DB_NAME);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 6a52bb36b1..b118a7d02f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -165,10 +165,10 @@ public class StatisticsUtil {
ConnectContext connectContext = new ConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
-
sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES);
+
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
sessionVariable.setEnableInsertStrict(true);
- sessionVariable.parallelExecInstanceNum =
StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
- sessionVariable.parallelPipelineTaskNum =
StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
+ sessionVariable.parallelExecInstanceNum =
Config.statistics_sql_parallel_exec_instance_num;
+ sessionVariable.parallelPipelineTaskNum =
Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.setEnableNereidsPlanner(false);
sessionVariable.enableProfile = false;
connectContext.setEnv(Env.getCurrentEnv());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index ace77c1ca8..f423981d2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -984,4 +984,8 @@ public class SystemInfoService {
}
return minPipelineExecutorSize;
}
+
+ public long aliveBECount() {
+ return
idToBackendRef.values().stream().filter(Backend::isAlive).count();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]