This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a617603be1 [opt](stats) Scale replica of stats table to 3 when it's 
possible (#22316)
a617603be1 is described below

commit a617603be11c19a7c903db07f4ffcd9173c98384
Author: AKIRA <[email protected]>
AuthorDate: Thu Jul 27 22:42:07 2023 +0800

    [opt](stats) Scale replica of stats table to 3 when it's possible (#22316)
    
    pick #22227 for branch-2.0
---
 .../main/java/org/apache/doris/common/Config.java  | 13 +++--
 .../doris/catalog/InternalSchemaInitializer.java   | 55 ++++++++++++++++++++--
 .../doris/statistics/StatisticConstants.java       | 15 ++----
 .../doris/statistics/util/StatisticsUtil.java      |  6 +--
 .../org/apache/doris/system/SystemInfoService.java |  4 ++
 5 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 970fe7eb5a..7450a1aa8f 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1681,13 +1681,6 @@ public class Config extends ConfigBase {
     @ConfField
     public static int statistics_simultaneously_running_task_num = 10;
 
-    /**
-     * Internal table replica num, once set, user should promise the avaible 
BE is greater than this value,
-     * otherwise the statistics related internal table creation would be 
failed.
-     */
-    @ConfField
-    public static int statistic_internal_table_replica_num = 1;
-
     /**
      * if table has too many replicas, Fe occur oom when schema change.
      * 10W replicas is a reasonable value for testing.
@@ -2047,4 +2040,10 @@ public class Config extends ConfigBase {
                     + "and modifying table properties. "
                     + "This config is recommended to be used only in the test 
environment"})
     public static int force_olap_table_replication_num = 0;
+
+    @ConfField
+    public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 * 
1024;
+
+    @ConfField
+    public static int statistics_sql_parallel_exec_instance_num = 1;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 74d7a5d9fa..39b2326e30 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -30,6 +30,8 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.statistics.StatisticConstants;
 import org.apache.doris.statistics.util.StatisticsUtil;
@@ -75,6 +77,53 @@ public class InternalSchemaInitializer extends Thread {
             }
         }
         LOG.info("Internal schema is initialized");
+        Optional<Database> op
+                = 
Env.getCurrentEnv().getInternalCatalog().getDb(StatisticConstants.DB_NAME);
+        if (!op.isPresent()) {
+            LOG.warn("Internal DB got deleted!");
+            return;
+        }
+        Database database = op.get();
+        modifyTblReplicaCount(database, StatisticConstants.ANALYSIS_TBL_NAME);
+        modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
+        modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
+    }
+
+    public void modifyTblReplicaCount(Database database, String tblName) {
+        if (!(Config.min_replication_num_per_tablet < 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
+                && Config.max_replication_num_per_tablet >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) {
+            return;
+        }
+        while (true) {
+            if (Env.getCurrentSystemInfo().aliveBECount() >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+                try {
+                    Map<String, String> props = new HashMap<>();
+                    
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, 
"tag.location.default: "
+                            + 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+                    TableIf colStatsTbl = 
StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
+                            StatisticConstants.DB_NAME, tblName);
+                    OlapTable olapTable = (OlapTable) colStatsTbl;
+                    Partition partition = 
olapTable.getPartition(olapTable.getName());
+                    if (partition.getReplicaCount() >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+                        return;
+                    }
+                    try {
+                        colStatsTbl.writeLock();
+                        
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) 
colStatsTbl, props);
+                    } finally {
+                        colStatsTbl.writeUnlock();
+                    }
+                    break;
+                } catch (Throwable t) {
+                    LOG.warn("Failed to scale replica of stats tbl:{} to 3", 
tblName, t);
+                }
+            }
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException t) {
+                // IGNORE
+            }
+        }
     }
 
     private void createTbl() throws UserException {
@@ -122,7 +171,7 @@ public class InternalSchemaInitializer extends Thread {
         Map<String, String> properties = new HashMap<String, String>() {
             {
                 put("replication_num", String.valueOf(
-                        Math.max(Config.statistic_internal_table_replica_num, 
Config.min_replication_num_per_tablet)));
+                        Math.max(1, Config.min_replication_num_per_tablet)));
             }
         };
         CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
@@ -162,7 +211,7 @@ public class InternalSchemaInitializer extends Thread {
         Map<String, String> properties = new HashMap<String, String>() {
             {
                 put("replication_num", String.valueOf(
-                        Math.max(Config.statistic_internal_table_replica_num, 
Config.min_replication_num_per_tablet)));
+                        Math.max(1, Config.min_replication_num_per_tablet)));
             }
         };
         CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
@@ -195,7 +244,7 @@ public class InternalSchemaInitializer extends Thread {
                 StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT, uniqueKeys);
         Map<String, String> properties = new HashMap<String, String>() {
             {
-                put("replication_num", 
String.valueOf(Math.max(Config.statistic_internal_table_replica_num,
+                put("replication_num", String.valueOf(Math.max(1,
                         Config.min_replication_num_per_tablet)));
             }
         };
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index eb9572bb74..32ec2a76e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -32,14 +32,10 @@ public class StatisticConstants {
 
     public static final String HISTOGRAM_TBL_NAME = "histogram_statistics";
 
-    public static final String ANALYSIS_JOB_TABLE = "analysis_jobs";
-
     public static final int MAX_NAME_LEN = 64;
 
     public static final int ID_LEN = 4096;
 
-    public static final int STATISTIC_PARALLEL_EXEC_INSTANCE_NUM = 1;
-
     public static final int STATISTICS_CACHE_VALID_DURATION_IN_HOURS = 24 * 2;
 
     public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2;
@@ -51,18 +47,11 @@ public class StatisticConstants {
      */
     public static final int STATISTIC_TABLE_BUCKET_COUNT = 7;
 
-    public static final long STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES = 2L * 1024 
* 1024 * 1024;
-
     /**
      * Determine the execution interval for 'Statistics Table Cleaner' thread.
      */
     public static final int STATISTIC_CLEAN_INTERVAL_IN_HOURS = 24 * 2;
 
-    /**
-     * If analysis job execution time exceeds this time, it would be cancelled.
-     */
-    public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = 
TimeUnit.MINUTES.toMillis(10);
-
     public static final long PRELOAD_RETRY_TIMES = 5;
 
     public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS = 
TimeUnit.SECONDS.toMillis(10);
@@ -84,6 +73,10 @@ public class StatisticConstants {
 
     public static List<String> STATISTICS_DB_BLACK_LIST = new ArrayList<>();
 
+    public static final String DB_NAME = SystemInfoService.DEFAULT_CLUSTER + 
":" + FeConstants.INTERNAL_DB_NAME;
+
+    public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;
+
     static {
         STATISTICS_DB_BLACK_LIST.add(SystemInfoService.DEFAULT_CLUSTER
                 + ClusterNamespace.CLUSTER_DELIMITER + 
FeConstants.INTERNAL_DB_NAME);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 6a52bb36b1..b118a7d02f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -165,10 +165,10 @@ public class StatisticsUtil {
         ConnectContext connectContext = new ConnectContext();
         SessionVariable sessionVariable = connectContext.getSessionVariable();
         sessionVariable.internalSession = true;
-        
sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES);
+        
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
         sessionVariable.setEnableInsertStrict(true);
-        sessionVariable.parallelExecInstanceNum = 
StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
-        sessionVariable.parallelPipelineTaskNum = 
StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
+        sessionVariable.parallelExecInstanceNum = 
Config.statistics_sql_parallel_exec_instance_num;
+        sessionVariable.parallelPipelineTaskNum = 
Config.statistics_sql_parallel_exec_instance_num;
         sessionVariable.setEnableNereidsPlanner(false);
         sessionVariable.enableProfile = false;
         connectContext.setEnv(Env.getCurrentEnv());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index ace77c1ca8..f423981d2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -984,4 +984,8 @@ public class SystemInfoService {
         }
         return minPipelineExecutorSize;
     }
+
+    public long aliveBECount() {
+        return 
idToBackendRef.values().stream().filter(Backend::isAlive).count();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to