This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e5000c708e [feature](statistics) Support for collecting statistics on 
materialized view (#14676)
e5000c708e is described below

commit e5000c708edb212bd747806bdfa79c6325840a12
Author: Kikyou1997 <[email protected]>
AuthorDate: Thu Dec 1 22:34:13 2022 +0800

    [feature](statistics) Support for collecting statistics on materialized 
view (#14676)
    
    1. Map muiltiple tasks to one Job
    2. Remove the codes for analyzing whole default db, since this feature is 
not available and would create too many tasks and related code is confusing
    3. support analyze materialized view
    4. abstract the common logic to BaseTask
---
 fe/fe-core/src/main/cup/sql_parser.cup             |   2 +-
 .../org/apache/doris/analysis/AnalyzeStmt.java     | 252 ++++++---------------
 .../org/apache/doris/analysis/SelectListItem.java  |   3 +
 .../java/org/apache/doris/analysis/SelectStmt.java |  30 ++-
 .../java/org/apache/doris/analysis/TableRef.java   |  16 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  24 +-
 .../doris/catalog/InternalSchemaInitializer.java   |  23 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |  17 +-
 .../main/java/org/apache/doris/catalog/Table.java  |   8 +-
 .../java/org/apache/doris/catalog/TableIf.java     |   8 +-
 .../doris/catalog/external/ExternalTable.java      |   8 +-
 .../doris/catalog/external/HMSExternalTable.java   |  16 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   6 +-
 .../org/apache/doris/statistics/AnalysisJob.java   | 207 -----------------
 .../apache/doris/statistics/AnalysisManager.java   | 145 ++++++++++++
 .../apache/doris/statistics/AnalysisState.java}    |  16 +-
 ...sJobExecutor.java => AnalysisTaskExecutor.java} |  34 +--
 ...{AnalysisJobInfo.java => AnalysisTaskInfo.java} |  72 +++---
 .../doris/statistics/AnalysisTaskInfoBuilder.java  | 115 ++++++++++
 ...obScheduler.java => AnalysisTaskScheduler.java} |  67 ++----
 ...sisJobWrapper.java => AnalysisTaskWrapper.java} |  38 ++--
 .../apache/doris/statistics/BaseAnalysisTask.java  | 160 +++++++++++++
 .../{HMSAnalysisJob.java => HMSAnalysisTask.java}  |   6 +-
 ...{HiveAnalysisJob.java => HiveAnalysisTask.java} |   8 +-
 ...rgAnalysisJob.java => IcebergAnalysisTask.java} |   6 +-
 .../apache/doris/statistics/MVAnalysisTask.java    | 144 ++++++++++++
 .../apache/doris/statistics/OlapAnalysisTask.java  | 110 +++++++++
 .../org/apache/doris/statistics/StatisticsJob.java |  16 --
 .../doris/statistics/StatisticsJobManager.java     |  35 ---
 .../doris/statistics/StatisticsRepository.java     |  59 ++---
 .../doris/statistics/util/StatisticsUtil.java      |   7 +-
 .../org/apache/doris/analysis/SqlModeTest.java     |   4 +-
 .../doris/clone/TabletRepairAndBalanceTest.java    |   3 +-
 .../doris/clone/TabletReplicaTooSlowTest.java      |   3 +-
 .../doris/cluster/DecommissionBackendTest.java     |  11 +
 .../doris/nereids/datasets/ssb/SSBTestBase.java    |   9 +
 .../doris/statistics/AnalysisJobExecutorTest.java  | 103 ---------
 .../apache/doris/statistics/AnalysisJobTest.java   |  24 +-
 .../doris/statistics/AnalysisTaskExecutorTest.java | 112 +++++++++
 .../apache/doris/statistics/MVStatisticsTest.java  |  85 +++++++
 .../apache/doris/utframe/TestWithFeService.java    |   6 +-
 41 files changed, 1212 insertions(+), 806 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 68d209ffe6..0735311f09 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2619,7 +2619,7 @@ show_create_routine_load_stmt ::=
 
 // analyze statment
 analyze_stmt ::=
-    KW_ANALYZE opt_table_name:tbl opt_col_list:cols 
opt_partition_names:partitionNames opt_properties:properties
+    KW_ANALYZE table_name:tbl opt_col_list:cols 
opt_partition_names:partitionNames opt_properties:properties
     {:
         RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties);
     :}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
index f99e172c56..f5ea7fabb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -39,14 +38,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -73,198 +70,69 @@ public class AnalyzeStmt extends DdlStmt {
 
     private static final Predicate<Long> DESIRED_TASK_TIMEOUT_SEC = (v) -> v > 
0L;
 
-    private final TableName optTableName;
+    public final boolean wholeTbl;
+
+    private final TableName tableName;
+
+    private TableIf table;
+
     private final PartitionNames optPartitionNames;
     private List<String> optColumnNames;
     private Map<String, String> optProperties;
 
     // after analyzed
     private long dbId;
-    private final Set<Long> tblIds = Sets.newHashSet();
+
     private final List<String> partitionNames = Lists.newArrayList();
 
-    // TODO(wzt): support multiple tables
-    public AnalyzeStmt(TableName optTableName,
+    public AnalyzeStmt(TableName tableName,
             List<String> optColumnNames,
             PartitionNames optPartitionNames,
             Map<String, String> optProperties) {
-        this.optTableName = optTableName;
+        this.tableName = tableName;
         this.optColumnNames = optColumnNames;
         this.optPartitionNames = optPartitionNames;
+        wholeTbl = CollectionUtils.isEmpty(optColumnNames);
         this.optProperties = optProperties;
     }
 
-    public long getDbId() {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The dbId must be obtained after the parsing is complete");
-        return dbId;
-    }
-
-    public Set<Long> getTblIds() {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The tblIds must be obtained after the parsing is complete");
-        return tblIds;
-    }
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
 
-    public Database getDb() throws AnalysisException {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The db must be obtained after the parsing is complete");
-        return 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId);
-    }
+        tableName.analyze(analyzer);
 
-    public List<Table> getTables() throws AnalysisException {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The tables must be obtained after the parsing is complete");
-        Database db = getDb();
-        List<Table> tables = Lists.newArrayList();
-
-        db.readLock();
-        try {
-            for (Long tblId : tblIds) {
-                Table table = db.getTableOrAnalysisException(tblId);
-                tables.add(table);
-            }
-        } finally {
-            db.readUnlock();
-        }
+        String catalogName = tableName.getCtl();
+        String dbName = tableName.getDb();
+        String tblName = tableName.getTbl();
+        CatalogIf catalog = 
analyzer.getEnv().getCatalogMgr().getCatalog(catalogName);
+        DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
+        table = db.getTableOrAnalysisException(tblName);
 
-        return tables;
-    }
+        checkAnalyzePriv(dbName, tblName);
 
-    public List<String> getPartitionNames() {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The partitionNames must be obtained after the parsing is 
complete");
-        return partitionNames;
-    }
-
-    /**
-     * The statistics task obtains partitions and then collects partition 
statistics,
-     * we need to filter out partitions that do not have data.
-     *
-     * @return map of tableId and partitionName
-     * @throws AnalysisException not analyzed
-     */
-    public Map<Long, List<String>> getTableIdToPartitionName() throws 
AnalysisException {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The partitionIds must be obtained after the parsing is 
complete");
-        Map<Long, List<String>> tableIdToPartitionName = Maps.newHashMap();
-
-        for (Table table : getTables()) {
+        if (optColumnNames != null && !optColumnNames.isEmpty()) {
             table.readLock();
             try {
-                OlapTable olapTable = (OlapTable) table;
-                List<String> partitionNames = getPartitionNames();
-                List<String> newPartitionNames = new 
ArrayList<>(partitionNames);
-                if (newPartitionNames.isEmpty() && olapTable.isPartitioned()) {
-                    newPartitionNames.addAll(olapTable.getPartitionNames());
+                List<String> baseSchema = table.getBaseSchema(false)
+                        
.stream().map(Column::getName).collect(Collectors.toList());
+                Optional<String> optional = optColumnNames.stream()
+                        .filter(entity -> 
!baseSchema.contains(entity)).findFirst();
+                if (optional.isPresent()) {
+                    String columnName = optional.get();
+                    
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
+                            columnName, FeNameFormat.getColumnNameRegex());
                 }
-                tableIdToPartitionName.put(table.getId(), newPartitionNames);
             } finally {
                 table.readUnlock();
             }
-        }
-        return tableIdToPartitionName;
-    }
-
-    public Map<Long, List<String>> getTableIdToColumnName() throws 
AnalysisException {
-        Preconditions.checkArgument(isAnalyzed(),
-                "The db name must be obtained after the parsing is complete");
-        Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-        List<Table> tables = getTables();
-        if (optColumnNames == null || optColumnNames.isEmpty()) {
-            for (Table table : tables) {
-                table.readLock();
-                try {
-                    long tblId = table.getId();
-                    List<Column> baseSchema = table.getBaseSchema();
-                    List<String> colNames = Lists.newArrayList();
-                    
baseSchema.stream().map(Column::getName).forEach(colNames::add);
-                    tableIdToColumnName.put(tblId, colNames);
-                } finally {
-                    table.readUnlock();
-                }
-            }
-        } else {
-            for (Long tblId : tblIds) {
-                tableIdToColumnName.put(tblId, optColumnNames);
-            }
-        }
-
-        return tableIdToColumnName;
-    }
-
-    public Map<String, String> getProperties() {
-        return optProperties;
-    }
-
-    @Override
-    public void analyze(Analyzer analyzer) throws UserException {
-        super.analyze(analyzer);
-
-        // step1: analyze db, table and column
-        if (optTableName != null) {
-            optTableName.analyze(analyzer);
-
-            String catalogName = optTableName.getCtl();
-            String dbName = optTableName.getDb();
-            String tblName = optTableName.getTbl();
-            CatalogIf catalog = 
analyzer.getEnv().getCatalogMgr().getCatalog(catalogName);
-            DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
-            TableIf table = db.getTableOrAnalysisException(tblName);
-
-            checkAnalyzePriv(dbName, tblName);
-
-            if (optColumnNames != null && !optColumnNames.isEmpty()) {
-                table.readLock();
-                try {
-                    List<String> baseSchema = table.getBaseSchema(false)
-                            
.stream().map(Column::getName).collect(Collectors.toList());
-                    Optional<String> optional = optColumnNames.stream()
-                            .filter(entity -> 
!baseSchema.contains(entity)).findFirst();
-                    if (optional.isPresent()) {
-                        String columnName = optional.get();
-                        
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
-                                columnName, FeNameFormat.getColumnNameRegex());
-                    }
-                } finally {
-                    table.readUnlock();
-                }
-            } else {
-                optColumnNames = table.getBaseSchema(false)
-                        
.stream().map(Column::getName).collect(Collectors.toList());
-            }
-
-            dbId = db.getId();
-            tblIds.add(table.getId());
         } else {
-            // analyze the current default db
-            String dbName = analyzer.getDefaultDb();
-            if (Strings.isNullOrEmpty(dbName)) {
-                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
-            }
-            Database db = 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName);
-
-            db.readLock();
-            try {
-                List<Table> tables = db.getTables();
-                for (Table table : tables) {
-                    checkAnalyzeType(table);
-                    checkAnalyzePriv(dbName, table.getName());
-                }
-
-                dbId = db.getId();
-                for (Table table : tables) {
-                    long tblId = table.getId();
-                    tblIds.add(tblId);
-                }
-            } finally {
-                db.readUnlock();
-            }
+            optColumnNames = table.getBaseSchema(false)
+                    
.stream().map(Column::getName).collect(Collectors.toList());
         }
-
+        dbId = db.getId();
         // step2: analyze partition
         checkPartitionNames();
-
         // step3: analyze properties
         checkProperties();
     }
@@ -286,18 +154,12 @@ public class AnalyzeStmt extends DdlStmt {
         }
     }
 
-    private void checkAnalyzeType(Table table) throws AnalysisException {
-        if (table.getType() != Table.TableType.OLAP) {
-            throw new AnalysisException("Only OLAP table statistics are 
supported");
-        }
-    }
-
     private void checkPartitionNames() throws AnalysisException {
         if (optPartitionNames != null) {
             optPartitionNames.analyze(analyzer);
-            if (optTableName != null) {
-                Database db = 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(optTableName.getDb());
-                OlapTable olapTable = (OlapTable) 
db.getTableOrAnalysisException(optTableName.getTbl());
+            if (tableName != null) {
+                Database db = 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tableName.getDb());
+                OlapTable olapTable = (OlapTable) 
db.getTableOrAnalysisException(tableName.getTbl());
                 if (!olapTable.isPartitioned()) {
                     throw new AnalysisException("Not a partitioned table: " + 
olapTable.getName());
                 }
@@ -340,9 +202,9 @@ public class AnalyzeStmt extends DdlStmt {
         StringBuilder sb = new StringBuilder();
         sb.append("ANALYZE");
 
-        if (optTableName != null) {
+        if (tableName != null) {
             sb.append(" ");
-            sb.append(optTableName.toSql());
+            sb.append(tableName.toSql());
         }
 
         if (optColumnNames != null) {
@@ -369,18 +231,46 @@ public class AnalyzeStmt extends DdlStmt {
     }
 
     public String getCatalogName() {
-        return optTableName.getCtl();
+        return tableName.getCtl();
     }
 
     public String getDBName() {
-        return optTableName.getDb();
+        return tableName.getDb();
     }
 
-    public String getTblName() {
-        return optTableName.getTbl();
+    public TableName getTblName() {
+        return tableName;
     }
 
     public List<String> getOptColumnNames() {
         return optColumnNames;
     }
+
+
+    public long getDbId() {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The dbId must be obtained after the parsing is complete");
+        return dbId;
+    }
+
+    public Database getDb() throws AnalysisException {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The db must be obtained after the parsing is complete");
+        return 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId);
+    }
+
+    public TableIf getTable() {
+        return table;
+    }
+
+    public List<String> getPartitionNames() {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The partitionNames must be obtained after the parsing is 
complete");
+        return partitionNames;
+    }
+
+    public Map<String, String> getProperties() {
+        return optProperties;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java
index 643da0095a..b757619867 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java
@@ -132,4 +132,7 @@ public class SelectListItem {
         return expr.toColumnLabel();
     }
 
+    public void setAlias(String alias) {
+        this.alias = alias;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 3f754595f1..b03ec0a691 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -1833,18 +1833,26 @@ public class SelectStmt extends QueryStmt {
         if (selectList.isDistinct()) {
             strBuilder.append("DISTINCT ");
         }
-        for (int i = 0; i < resultExprs.size(); ++i) {
-            // strBuilder.append(selectList.getItems().get(i).toSql());
-            // strBuilder.append((i + 1 != selectList.getItems().size()) ? ", 
" : "");
-            if (i != 0) {
-                strBuilder.append(", ");
-            }
-            if (needToSql) {
-                strBuilder.append(originalExpr.get(i).toSql());
-            } else {
-                strBuilder.append(resultExprs.get(i).toSql());
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx == null || ctx.getSessionVariable().internalSession) {
+            for (int i = 0; i < selectList.getItems().size(); i++) {
+                strBuilder.append(selectList.getItems().get(i).toSql());
+                strBuilder.append((i + 1 != selectList.getItems().size()) ? ", 
" : "");
+            }
+        } else {
+            for (int i = 0; i < resultExprs.size(); ++i) {
+                // strBuilder.append(selectList.getItems().get(i).toSql());
+                // strBuilder.append((i + 1 != selectList.getItems().size()) ? 
", " : "");
+                if (i != 0) {
+                    strBuilder.append(", ");
+                }
+                if (needToSql) {
+                    strBuilder.append(originalExpr.get(i).toSql());
+                } else {
+                    strBuilder.append(resultExprs.get(i).toSql());
+                }
+                strBuilder.append(" AS 
").append(SqlUtils.getIdentSql(colLabels.get(i)));
             }
-            strBuilder.append(" AS 
").append(SqlUtils.getIdentSql(colLabels.get(i)));
         }
 
         // From clause
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 0aa5d59291..f5c37e5fd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.StringJoiner;
 
 /**
  * Superclass of all table references, including references to views, base 
tables
@@ -226,6 +227,13 @@ public class TableRef implements ParseNode, Writable {
             output.append("[").append(Joiner.on(", 
").join(joinHints)).append("] ");
         }
         output.append(tableRefToSql()).append(" ");
+        if (partitionNames != null) {
+            StringJoiner sj = new StringJoiner(",", "", " ");
+            for (String partName : partitionNames.getPartitionNames()) {
+                sj.add(partName);
+            }
+            output.append(sj.toString());
+        }
         if (usingColNames != null) {
             output.append("USING (").append(Joiner.on(", 
").join(usingColNames)).append(")");
         } else if (onClause != null) {
@@ -864,4 +872,12 @@ public class TableRef implements ParseNode, Writable {
             aliases = new String[]{alias};
         }
     }
+
+    public void setPartitionNames(PartitionNames partitionNames) {
+        this.partitionNames = partitionNames;
+    }
+
+    public void setName(TableName name) {
+        this.name = name;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f57f262526..e29713731f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -208,12 +208,12 @@ import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.statistics.AnalysisJobScheduler;
+import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.AnalysisTaskScheduler;
 import org.apache.doris.statistics.StatisticsCache;
 import org.apache.doris.statistics.StatisticsJobManager;
 import org.apache.doris.statistics.StatisticsJobScheduler;
 import org.apache.doris.statistics.StatisticsManager;
-import org.apache.doris.statistics.StatisticsRepository;
 import org.apache.doris.statistics.StatisticsTaskScheduler;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.FQDNManager;
@@ -441,9 +441,7 @@ public class Env {
 
     private MTMVJobManager mtmvJobManager;
 
-    private final AnalysisJobScheduler analysisJobScheduler;
-
-    private final StatisticsCache statisticsCache;
+    private AnalysisManager analysisManager;
 
     private ExternalMetaCacheMgr extMetaCacheMgr;
 
@@ -647,10 +645,9 @@ public class Env {
         this.refreshManager = new RefreshManager();
         this.policyMgr = new PolicyMgr();
         this.mtmvJobManager = new MTMVJobManager();
-        this.analysisJobScheduler = new AnalysisJobScheduler();
-        this.statisticsCache = new StatisticsCache();
         this.extMetaCacheMgr = new ExternalMetaCacheMgr();
         this.fqdnManager = new FQDNManager(systemInfo);
+        this.analysisManager = new AnalysisManager();
     }
 
     public static void destroyCheckpoint() {
@@ -1653,7 +1650,7 @@ public class Env {
     }
 
     public StatisticsCache getStatisticsCache() {
-        return statisticsCache;
+        return analysisManager.getStatisticsCache();
     }
 
     public boolean hasReplayer() {
@@ -5233,8 +5230,8 @@ public class Env {
         return count;
     }
 
-    public AnalysisJobScheduler getAnalysisJobScheduler() {
-        return analysisJobScheduler;
+    public AnalysisTaskScheduler getAnalysisJobScheduler() {
+        return analysisManager.taskScheduler;
     }
 
     // TODO:
@@ -5242,6 +5239,11 @@ public class Env {
     //  2. support sample job
     //  3. support period job
     public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
-        StatisticsRepository.createAnalysisJob(analyzeStmt);
+        analysisManager.createAnalysisJob(analyzeStmt);
+    }
+
+    public AnalysisManager getAnalysisManager() {
+        return analysisManager;
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 77294b3b76..8cafd7d777 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -44,24 +44,22 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class InternalSchemaInitializer extends Thread {
 
     private static final Logger LOG = 
LogManager.getLogger(InternalSchemaInitializer.class);
 
-    public static boolean forTest = false;
-
     /**
      * If internal table creation failed, will retry after below seconds.
      */
     public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1;
 
-
     public void run() {
-        if (forTest) {
+        if (FeConstants.runningUnitTest) {
             return;
         }
-        while (true) {
+        while (!created()) {
             FrontendNodeType feType = Env.getCurrentEnv().getFeType();
             if (feType.equals(FrontendNodeType.INIT) || 
feType.equals(FrontendNodeType.UNKNOWN)) {
                 LOG.warn("FE is not ready");
@@ -72,7 +70,6 @@ public class InternalSchemaInitializer extends Thread {
                         .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 
1000L);
                 createDB();
                 createTbl();
-                break;
             } catch (Throwable e) {
                 LOG.warn("Statistics storage initiated failed, will try again 
later", e);
             }
@@ -145,10 +142,12 @@ public class InternalSchemaInitializer extends Thread {
                 FeConstants.INTERNAL_DB_NAME, 
StatisticConstants.ANALYSIS_JOB_TABLE);
         List<ColumnDef> columnDefs = new ArrayList<>();
         columnDefs.add(new ColumnDef("job_id", 
TypeDef.create(PrimitiveType.BIGINT)));
+        columnDefs.add(new ColumnDef("task_id", 
TypeDef.create(PrimitiveType.BIGINT)));
         columnDefs.add(new ColumnDef("catalog_name", 
TypeDef.createVarchar(1024)));
         columnDefs.add(new ColumnDef("db_name", TypeDef.createVarchar(1024)));
         columnDefs.add(new ColumnDef("tbl_name", TypeDef.createVarchar(1024)));
         columnDefs.add(new ColumnDef("col_name", TypeDef.createVarchar(1024)));
+        columnDefs.add(new ColumnDef("index_id", 
TypeDef.create(PrimitiveType.BIGINT)));
         columnDefs.add(new ColumnDef("job_type", TypeDef.createVarchar(32)));
         columnDefs.add(new ColumnDef("analysis_type", 
TypeDef.createVarchar(32)));
         columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024)));
@@ -175,4 +174,16 @@ public class InternalSchemaInitializer extends Thread {
         return createTableStmt;
     }
 
+    private boolean created() {
+        Optional<Database> optionalDatabase =
+                Env.getCurrentEnv().getInternalCatalog()
+                        .getDb(SystemInfoService.DEFAULT_CLUSTER + ":" + 
FeConstants.INTERNAL_DB_NAME);
+        if (!optionalDatabase.isPresent()) {
+            return false;
+        }
+        Database db = optionalDatabase.get();
+        return db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent()
+                && 
db.getTable(StatisticConstants.ANALYSIS_JOB_TABLE).isPresent();
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 91cd500ddf..c43aef26eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -47,9 +47,12 @@ import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.resource.Tag;
-import org.apache.doris.statistics.AnalysisJob;
-import org.apache.doris.statistics.AnalysisJobInfo;
-import org.apache.doris.statistics.AnalysisJobScheduler;
+import org.apache.doris.statistics.AnalysisTaskInfo;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisTaskScheduler;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.MVAnalysisTask;
+import org.apache.doris.statistics.OlapAnalysisTask;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TCompressionType;
@@ -997,8 +1000,11 @@ public class OlapTable extends Table {
     }
 
     @Override
-    public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, 
AnalysisJobInfo info) {
-        return new AnalysisJob(scheduler, info);
+    public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler 
scheduler, AnalysisTaskInfo info) {
+        if (info.analysisType.equals(AnalysisType.COLUMN)) {
+            return new OlapAnalysisTask(scheduler, info);
+        }
+        return new MVAnalysisTask(scheduler, info);
     }
 
     @Override
@@ -1939,4 +1945,5 @@ public class OlapTable extends Table {
     public Set<Long> getPartitionKeys() {
         return idToPartition.keySet();
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 24c33bd837..56f02f9832 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -26,9 +26,9 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SqlUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.external.hudi.HudiTable;
-import org.apache.doris.statistics.AnalysisJob;
-import org.apache.doris.statistics.AnalysisJobInfo;
-import org.apache.doris.statistics.AnalysisJobScheduler;
+import org.apache.doris.statistics.AnalysisTaskInfo;
+import org.apache.doris.statistics.AnalysisTaskScheduler;
+import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.thrift.TTableDescriptor;
 
 import com.google.common.base.Preconditions;
@@ -508,7 +508,7 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
     }
 
     @Override
-    public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, 
AnalysisJobInfo info) {
+    public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler 
scheduler, AnalysisTaskInfo info) {
         throw new NotImplementedException();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 361ef44ba2..6aebe1feb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -20,9 +20,9 @@ package org.apache.doris.catalog;
 import org.apache.doris.alter.AlterCancelException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.statistics.AnalysisJob;
-import org.apache.doris.statistics.AnalysisJobInfo;
-import org.apache.doris.statistics.AnalysisJobScheduler;
+import org.apache.doris.statistics.AnalysisTaskInfo;
+import org.apache.doris.statistics.AnalysisTaskScheduler;
+import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.thrift.TTableDescriptor;
 
 import java.util.Collections;
@@ -111,7 +111,7 @@ public interface TableIf {
 
     TTableDescriptor toThrift();
 
-    AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, 
AnalysisJobInfo info);
+    BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, 
AnalysisTaskInfo info);
 
     /**
      * Doris table type.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index eeca74ff5c..c4cb6aad00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -29,9 +29,9 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.statistics.AnalysisJob;
-import org.apache.doris.statistics.AnalysisJobInfo;
-import org.apache.doris.statistics.AnalysisJobScheduler;
+import org.apache.doris.statistics.AnalysisTaskInfo;
+import org.apache.doris.statistics.AnalysisTaskScheduler;
+import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.thrift.TTableDescriptor;
 
 import com.google.gson.annotations.SerializedName;
@@ -301,7 +301,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
     }
 
     @Override
-    public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, 
AnalysisJobInfo info) {
+    public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler 
scheduler, AnalysisTaskInfo info) {
         throw new NotImplementedException();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index bfefe1f829..d3e991146f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -22,11 +22,11 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.PooledHiveMetaStoreClient;
-import org.apache.doris.statistics.AnalysisJob;
-import org.apache.doris.statistics.AnalysisJobInfo;
-import org.apache.doris.statistics.AnalysisJobScheduler;
-import org.apache.doris.statistics.HiveAnalysisJob;
-import org.apache.doris.statistics.IcebergAnalysisJob;
+import org.apache.doris.statistics.AnalysisTaskInfo;
+import org.apache.doris.statistics.AnalysisTaskScheduler;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.HiveAnalysisTask;
+import org.apache.doris.statistics.IcebergAnalysisTask;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -268,13 +268,13 @@ public class HMSExternalTable extends ExternalTable {
     }
 
     @Override
-    public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, 
AnalysisJobInfo info) {
+    public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler 
scheduler, AnalysisTaskInfo info) {
         makeSureInitialized();
         switch (dlaType) {
             case HIVE:
-                return new HiveAnalysisJob(scheduler, info);
+                return new HiveAnalysisTask(scheduler, info);
             case ICEBERG:
-                return new IcebergAnalysisJob(scheduler, info);
+                return new IcebergAnalysisTask(scheduler, info);
             default:
                 throw new IllegalArgumentException("Analysis job for dlaType " 
+ dlaType + " not supported.");
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 7a83d8db8d..38503d1263 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1791,12 +1791,13 @@ public class StmtExecutor implements ProfileWriter {
 
     public List<ResultRow> executeInternalQuery() {
         try {
+            List<ResultRow> resultRows = new ArrayList<>();
             analyzer = new Analyzer(context.getEnv(), context);
             try {
                 analyze(context.getSessionVariable().toThrift());
             } catch (UserException e) {
                 LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, 
e);
-                return null;
+                return resultRows;
             }
             planner.getFragments();
             RowBatch batch;
@@ -1821,7 +1822,6 @@ public class StmtExecutor implements ProfileWriter {
             }
             Span fetchResultSpan = context.getTracer().spanBuilder("fetch 
internal SQL result")
                     .setParent(Context.current()).startSpan();
-            List<ResultRow> resultRows = new ArrayList<>();
             try (Scope scope = fetchResultSpan.makeCurrent()) {
                 while (true) {
                     batch = coord.getNext();
@@ -1834,7 +1834,7 @@ public class StmtExecutor implements ProfileWriter {
             } catch (Exception e) {
                 LOG.warn("Unexpected exception when SQL running", e);
                 fetchResultSpan.recordException(e);
-                return null;
+                return resultRows;
             } finally {
                 fetchResultSpan.end();
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
deleted file mode 100644
index 06fb36e644..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
+++ /dev/null
@@ -1,207 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.DatabaseIf;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.AnalysisJobInfo.JobState;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.commons.text.StringSubstitutor;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class AnalysisJob {
-
-    private final AnalysisJobScheduler analysisJobScheduler;
-
-    protected final AnalysisJobInfo info;
-
-    protected CatalogIf catalog;
-
-    protected DatabaseIf db;
-
-    protected TableIf tbl;
-
-    protected Column col;
-
-    protected StmtExecutor stmtExecutor;
-
-    public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
-        this.analysisJobScheduler = analysisJobScheduler;
-        this.info = info;
-        init(info);
-    }
-
-    private void init(AnalysisJobInfo info) {
-        catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
-        if (catalog == null) {
-            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
-                    String.format("Catalog with name: %s not exists", 
info.dbName), System.currentTimeMillis());
-            return;
-        }
-        db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null);
-        if (db == null) {
-            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
-                    String.format("DB with name %s not exists", info.dbName), 
System.currentTimeMillis());
-            return;
-        }
-        tbl = (TableIf) db.getTable(info.tblName).orElse(null);
-        if (tbl == null) {
-            analysisJobScheduler.updateJobStatus(
-                    info.jobId, JobState.FAILED,
-                    String.format("Table with name %s not exists", 
info.tblName), System.currentTimeMillis());
-        }
-        col = tbl.getColumn(info.colName);
-        if (col == null) {
-            analysisJobScheduler.updateJobStatus(
-                    info.jobId, JobState.FAILED, String.format("Column with 
name %s not exists", info.tblName),
-                    System.currentTimeMillis());
-        }
-    }
-
-    private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " SELECT "
-            + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
-            + "${catalogId} AS catalog_id, "
-            + "${dbId} AS db_id, "
-            + "${tblId} AS tbl_id, "
-            + "'${colId}' AS col_id, "
-            + "${partId} AS part_id, "
-            + "COUNT(1) AS row_count, "
-            + "NDV(${colName}) AS ndv, "
-            + "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
-            + "MIN(${colName}) AS min, "
-            + "MAX(${colName}) AS max, "
-            + "${dataSizeFunction} AS data_size, "
-            + "NOW()"
-            + "FROM `${dbName}`.`${tblName}` "
-            + "PARTITION ${partName}";
-
-    private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + "    SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, 
row_count, "
-            + "        ndv, null_count, min, max, data_size, update_time\n"
-            + "    FROM \n"
-            + "     (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, "
-            + "         ${catalogId} AS catalog_id, "
-            + "         ${dbId} AS db_id, "
-            + "         ${tblId} AS tbl_id, "
-            + "         '${colId}' AS col_id, "
-            + "         NULL AS part_id, "
-            + "         SUM(count) AS row_count, \n"
-            + "         SUM(null_count) AS null_count, "
-            + "         MIN(CAST(min AS ${type})) AS min, "
-            + "         MAX(CAST(max AS ${type})) AS max, "
-            + "         SUM(data_size_in_bytes) AS data_size, "
-            + "         NOW() AS update_time\n"
-            + "     FROM ${internalDB}.${columnStatTbl}"
-            + "     WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND 
"
-            + "     ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
-            + "     ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
-            + "     ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
-            + "     ) t1, \n"
-            + "     (SELECT NDV(${colName}) AS ndv FROM 
`${dbName}`.`${tblName}`) t2\n";
-
-    private String getDataSizeFunction() {
-        if (col.getType().isStringType()) {
-            return "SUM(LENGTH(${colName}))";
-        }
-        return "COUNT(1) * " + col.getType().getSlotSize();
-    }
-
-    public void execute() throws Exception {
-        Map<String, String> params = new HashMap<>();
-        params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
-        params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
-        params.put("catalogId", String.valueOf(catalog.getId()));
-        params.put("dbId", String.valueOf(db.getId()));
-        params.put("tblId", String.valueOf(tbl.getId()));
-        params.put("colId", String.valueOf(info.colName));
-        params.put("dataSizeFunction", getDataSizeFunction());
-        params.put("dbName", info.dbName);
-        params.put("colName", String.valueOf(info.colName));
-        params.put("tblName", String.valueOf(info.tblName));
-        List<String> partitionAnalysisSQLs = new ArrayList<>();
-        try {
-            tbl.readLock();
-            Set<String> partNames = ((Table) tbl).getPartitionNames();
-            for (String partName : partNames) {
-                Partition part = ((Table) tbl).getPartition(partName);
-                if (part == null) {
-                    continue;
-                }
-                params.put("partId", String.valueOf(((Table) 
tbl).getPartition(partName).getId()));
-                params.put("partName", String.valueOf(partName));
-                StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-                
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
-            }
-        } finally {
-            tbl.readUnlock();
-        }
-        for (String sql : partitionAnalysisSQLs) {
-            try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext()) {
-                this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-                this.stmtExecutor.execute();
-            }
-        }
-        params.remove("partId");
-        params.put("type", col.getType().toString());
-        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
-        String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
-            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            this.stmtExecutor.execute();
-            Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), 
col.getName());
-        }
-    }
-
-    public int getLastExecTime() {
-        return info.lastExecTimeInMs;
-    }
-
-    public void cancel() {
-        if (stmtExecutor != null) {
-            stmtExecutor.cancel();
-        }
-        analysisJobScheduler
-                .updateJobStatus(info.jobId, JobState.FAILED,
-                        String.format("Job has been cancelled: %s", 
info.toString()), -1);
-    }
-
-    public void updateState(JobState jobState) {
-        info.updateState(jobState);
-    }
-
-    public long getJobId() {
-        return info.jobId;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
new file mode 100644
index 0000000000..3833acc20f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
+import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class AnalysisManager {
+
+    private static final Logger LOG = 
LogManager.getLogger(AnalysisManager.class);
+
+    private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
+            + FeConstants.INTERNAL_DB_NAME + "." + 
StatisticConstants.ANALYSIS_JOB_TABLE + " "
+            + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE 
job_id = ${jobId}";
+
+    private final ConcurrentMap<Long, Map<Long, AnalysisTaskInfo>> 
analysisJobIdToTaskMap;
+
+    public final AnalysisTaskScheduler taskScheduler;
+
+    private StatisticsCache statisticsCache;
+
+    private final AnalysisTaskExecutor taskExecutor;
+
+    public AnalysisManager() {
+        analysisJobIdToTaskMap = new ConcurrentHashMap<>();
+        this.taskScheduler = new AnalysisTaskScheduler();
+        taskExecutor = new AnalysisTaskExecutor(taskScheduler);
+        this.statisticsCache = new StatisticsCache();
+        taskExecutor.start();
+    }
+
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+        String catalogName = analyzeStmt.getCatalogName();
+        String db = analyzeStmt.getDBName();
+        TableName tbl = analyzeStmt.getTblName();
+        StatisticsUtil.convertTableNameToObjects(tbl);
+        List<String> colNames = analyzeStmt.getOptColumnNames();
+        Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
+        long jobId = Env.getCurrentEnv().getNextId();
+        if (colNames != null) {
+            for (String colName : colNames) {
+                long taskId = Env.getCurrentEnv().getNextId();
+                AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(jobId)
+                        
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
+                        
.setTblName(tbl.getTbl()).setColName(colName).setJobType(JobType.MANUAL)
+                        
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.COLUMN)
+                        .setScheduleType(ScheduleType.ONCE).build();
+                try {
+                    StatisticsRepository.createAnalysisTask(analysisTaskInfo);
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to create analysis 
job", e);
+                }
+                analysisTaskInfos.put(taskId, analysisTaskInfo);
+            }
+        }
+        if (analyzeStmt.wholeTbl && 
analyzeStmt.getTable().getType().equals(TableType.OLAP)) {
+            OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
+            try {
+                olapTable.readLock();
+                for (MaterializedIndexMeta meta : 
olapTable.getIndexIdToMeta().values()) {
+                    if (meta.getDefineStmt() == null) {
+                        continue;
+                    }
+                    long taskId = Env.getCurrentEnv().getNextId();
+                    AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(
+                                    jobId).setTaskId(taskId)
+                            .setCatalogName(catalogName).setDbName(db)
+                            
.setTblName(tbl.getTbl()).setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
+                            
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
+                            .setScheduleType(ScheduleType.ONCE).build();
+                    try {
+                        
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
+                    } catch (Exception e) {
+                        throw new RuntimeException("Failed to create analysis 
job", e);
+                    }
+                    analysisTaskInfos.put(taskId, analysisTaskInfo);
+                }
+            } finally {
+                olapTable.readUnlock();
+            }
+        }
+        analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
+        analysisTaskInfos.values().forEach(taskScheduler::schedule);
+    }
+
+    public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState 
jobState, String message, long time) {
+        Map<String, String> params = new HashMap<>();
+        params.put("jobState", jobState.toString());
+        params.put("message", StringUtils.isNotEmpty(message) ? 
String.format(", message = '%s'", message) : "");
+        params.put("updateExecTime", time == -1 ? "" : ", 
last_exec_time_in_ms=" + time);
+        params.put("jobId", String.valueOf(info.jobId));
+        try {
+            StatisticsUtil.execUpdate(new 
StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
+        } catch (Exception e) {
+            LOG.warn(String.format("Failed to update state for job: %s", 
info.jobId), e);
+        } finally {
+            info.state = jobState;
+            if (analysisJobIdToTaskMap.get(info.jobId).values()
+                    .stream().allMatch(i -> i.state != AnalysisState.PENDING 
&& i.state != AnalysisState.RUNNING)) {
+                analysisJobIdToTaskMap.remove(info.jobId);
+            }
+
+        }
+    }
+
+    public StatisticsCache getStatisticsCache() {
+        return statisticsCache;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
 b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java
similarity index 67%
copy from 
fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
copy to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java
index 691da60173..bab8a462e8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java
@@ -15,15 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.datasets.ssb;
+package org.apache.doris.statistics;
 
-import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
-
-public abstract class SSBTestBase extends AnalyzeCheckTestBase {
-    @Override
-    protected void runBeforeAll() throws Exception {
-        createDatabase("test");
-        connectContext.setDatabase("default_cluster:test");
-        SSBUtils.createTables(this);
-    }
+public enum AnalysisState {
+    PENDING,
+    RUNNING,
+    FINISHED,
+    FAILED;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
similarity index 75%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index c73bf349b8..ff98890cf7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -17,10 +17,10 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
-import org.apache.doris.statistics.AnalysisJobInfo.JobState;
 import org.apache.doris.statistics.util.BlockingCounter;
 
 import org.apache.logging.log4j.LogManager;
@@ -33,9 +33,9 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-public class AnalysisJobExecutor extends Thread {
+public class AnalysisTaskExecutor extends Thread {
 
-    private static final Logger LOG = 
LogManager.getLogger(AnalysisJobExecutor.class);
+    private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskExecutor.class);
 
     private final ThreadPoolExecutor executors = 
ThreadPoolManager.newDaemonThreadPool(
             Config.statistics_simultaneously_running_job_num,
@@ -44,17 +44,17 @@ public class AnalysisJobExecutor extends Thread {
             new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
             "Analysis Job Executor", true);
 
-    private final AnalysisJobScheduler jobScheduler;
+    private final AnalysisTaskScheduler taskScheduler;
 
     private final BlockingCounter blockingCounter =
             new 
BlockingCounter(Config.statistics_simultaneously_running_job_num);
 
-    private final BlockingQueue<AnalysisJobWrapper> jobQueue =
-            new PriorityBlockingQueue<AnalysisJobWrapper>(20,
-                    
Comparator.comparingLong(AnalysisJobWrapper::getStartTime));
+    private final BlockingQueue<AnalysisTaskWrapper> jobQueue =
+            new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
+                    
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
 
-    public AnalysisJobExecutor(AnalysisJobScheduler jobExecutor) {
-        this.jobScheduler = jobExecutor;
+    public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) {
+        this.taskScheduler = jobExecutor;
     }
 
     @Override
@@ -73,12 +73,12 @@ public class AnalysisJobExecutor extends Thread {
     private void doCancelExpiredJob() {
         for (;;) {
             try {
-                AnalysisJobWrapper jobWrapper = jobQueue.take();
+                AnalysisTaskWrapper taskWrapper = jobQueue.take();
                 try {
                     long timeout = 
StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS;
-                    jobWrapper.get(timeout < 0 ? 0 : timeout, 
TimeUnit.MILLISECONDS);
+                    taskWrapper.get(timeout < 0 ? 0 : timeout, 
TimeUnit.MILLISECONDS);
                 } catch (Exception e) {
-                    jobWrapper.cancel();
+                    taskWrapper.cancel();
                 }
             } catch (Throwable throwable) {
                 LOG.warn(throwable);
@@ -101,11 +101,13 @@ public class AnalysisJobExecutor extends Thread {
     }
 
     private void doFetchAndExecute() {
-        AnalysisJob job = jobScheduler.getPendingJobs();
-        AnalysisJobWrapper jobWrapper = new AnalysisJobWrapper(this, job);
+        BaseAnalysisTask task = taskScheduler.getPendingTasks();
+        AnalysisTaskWrapper jobWrapper = new AnalysisTaskWrapper(this, task);
         incr();
-        jobScheduler.updateJobStatus(job.getJobId(), JobState.RUNNING, "", -1);
         executors.submit(jobWrapper);
+        Env.getCurrentEnv().getAnalysisManager()
+                .updateTaskStatus(task.info,
+                        AnalysisState.RUNNING, "", System.currentTimeMillis());
     }
 
     public void decr() {
@@ -116,7 +118,7 @@ public class AnalysisJobExecutor extends Thread {
         blockingCounter.incr();
     }
 
-    public void putJob(AnalysisJobWrapper wrapper) throws Exception {
+    public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
         jobQueue.put(wrapper);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
similarity index 65%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
index da8fa4cede..6c2243b261 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
@@ -20,23 +20,21 @@ package org.apache.doris.statistics;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Objects;
 import java.util.StringJoiner;
 
-public class AnalysisJobInfo {
+public class AnalysisTaskInfo {
 
-    private static final Logger LOG = 
LogManager.getLogger(AnalysisJobInfo.class);
+    private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskInfo.class);
 
-    public enum JobState {
-        PENDING,
-        RUNNING,
-        FINISHED,
-        FAILED;
+
+    public enum AnalysisMethod {
+        SAMPLE,
+        FULL
     }
 
     public enum AnalysisType {
-        SAMPLE,
-        FULL;
+        COLUMN,
+        INDEX
     }
 
     public enum JobType {
@@ -53,6 +51,8 @@ public class AnalysisJobInfo {
 
     public final long jobId;
 
+    public final long taskId;
+
     public final String catalogName;
 
     public final String dbName;
@@ -61,56 +61,43 @@ public class AnalysisJobInfo {
 
     public final String colName;
 
+    public final Long indexId;
+
     public final JobType jobType;
 
-    public AnalysisType analysisType;
+    public final AnalysisMethod analysisMethod;
+
+    public final AnalysisType analysisType;
 
     public String message;
 
     // finished or failed
     public int lastExecTimeInMs = 0;
 
-    private JobState state;
+    public AnalysisState state;
 
     public final ScheduleType scheduleType;
 
-    public AnalysisJobInfo(long jobId, String catalogName, String dbName, 
String tblName, String colName,
-            JobType jobType, ScheduleType scheduleType) {
+    public AnalysisTaskInfo(long jobId, long taskId, String catalogName, 
String dbName, String tblName,
+            String colName, Long indexId, JobType jobType,
+            AnalysisMethod analysisMethod, AnalysisType analysisType, String 
message, int lastExecTimeInMs,
+            AnalysisState state, ScheduleType scheduleType) {
         this.jobId = jobId;
+        this.taskId = taskId;
         this.catalogName = catalogName;
         this.dbName = dbName;
         this.tblName = tblName;
         this.colName = colName;
+        this.indexId = indexId;
         this.jobType = jobType;
+        this.analysisMethod = analysisMethod;
+        this.analysisType = analysisType;
+        this.message = message;
+        this.lastExecTimeInMs = lastExecTimeInMs;
+        this.state = state;
         this.scheduleType = scheduleType;
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(catalogName, dbName, tblName, colName, 
analysisType);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        AnalysisJobInfo other = (AnalysisJobInfo) obj;
-        return catalogName.equals(other.catalogName)
-                && dbName.equals(other.dbName)
-                && tblName.equals(other.tblName)
-                && colName.equals(other.colName)
-                && analysisType.equals(other.analysisType);
-    }
-
-    // TODO: log to meta
-    public void updateState(JobState jobState) {
-        this.state = jobState;
-    }
-
     @Override
     public String toString() {
         StringJoiner sj = new StringJoiner("\n", getClass().getName() + ":\n", 
"\n");
@@ -119,14 +106,15 @@ public class AnalysisJobInfo {
         sj.add("DBName: " + dbName);
         sj.add("TableName: " + tblName);
         sj.add("ColumnName: " + colName);
-        sj.add("JobType: " + analysisType.toString());
+        sj.add("TaskType: " + analysisType.toString());
+        sj.add("TaskMethod: " + analysisMethod.toString());
         sj.add("Message: " + message);
         sj.add("LastExecTime: " + String.valueOf(lastExecTimeInMs));
         sj.add("CurrentState: " + state.toString());
         return sj.toString();
     }
 
-    public JobState getState() {
+    public AnalysisState getState() {
         return state;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
new file mode 100644
index 0000000000..cc3b7b62f1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
+import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
+
+public class AnalysisTaskInfoBuilder {
+    private long jobId;
+    private long taskId;
+    private String catalogName;
+    private String dbName;
+    private String tblName;
+    private String colName;
+    private Long indexId;
+    private JobType jobType;
+    private AnalysisMethod analysisMethod;
+    private AnalysisType analysisType;
+    private String message;
+    private int lastExecTimeInMs;
+    private AnalysisState state;
+    private ScheduleType scheduleType;
+
+    public AnalysisTaskInfoBuilder setJobId(long jobId) {
+        this.jobId = jobId;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setTaskId(long taskId) {
+        this.taskId = taskId;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setCatalogName(String catalogName) {
+        this.catalogName = catalogName;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setDbName(String dbName) {
+        this.dbName = dbName;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setTblName(String tblName) {
+        this.tblName = tblName;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setColName(String colName) {
+        this.colName = colName;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setIndexId(Long indexId) {
+        this.indexId = indexId;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setJobType(JobType jobType) {
+        this.jobType = jobType;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setAnalysisMethod(AnalysisMethod 
analysisMethod) {
+        this.analysisMethod = analysisMethod;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setAnalysisType(AnalysisType analysisType) {
+        this.analysisType = analysisType;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setMessage(String message) {
+        this.message = message;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setLastExecTimeInMs(int lastExecTimeInMs) {
+        this.lastExecTimeInMs = lastExecTimeInMs;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setState(AnalysisState state) {
+        this.state = state;
+        return this;
+    }
+
+    public AnalysisTaskInfoBuilder setScheduleType(ScheduleType scheduleType) {
+        this.scheduleType = scheduleType;
+        return this;
+    }
+
+    public AnalysisTaskInfo build() {
+        return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, 
tblName, colName, indexId, jobType,
+                analysisMethod, analysisType, message, lastExecTimeInMs, 
state, scheduleType);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
similarity index 54%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
index 0918339d35..71f23fe955 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
@@ -20,94 +20,63 @@ package org.apache.doris.statistics;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.statistics.AnalysisJobInfo.JobState;
-import org.apache.doris.statistics.AnalysisJobInfo.JobType;
-import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.text.StringSubstitutor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 
-public class AnalysisJobScheduler {
+public class AnalysisTaskScheduler {
 
-    private static final Logger LOG = 
LogManager.getLogger(AnalysisJobScheduler.class);
+    private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskScheduler.class);
 
-    private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
-            + FeConstants.INTERNAL_DB_NAME + "." + 
StatisticConstants.ANALYSIS_JOB_TABLE + " "
-            + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE 
job_id = ${jobId}";
+    private final PriorityQueue<BaseAnalysisTask> systemJobQueue =
+            new 
PriorityQueue<>(Comparator.comparingInt(BaseAnalysisTask::getLastExecTime));
 
-    private final PriorityQueue<AnalysisJob> systemJobQueue =
-            new 
PriorityQueue<AnalysisJob>(Comparator.comparingInt(AnalysisJob::getLastExecTime));
+    private final Queue<BaseAnalysisTask> manualJobQueue = new LinkedList<>();
 
-    private final Queue<AnalysisJob> manualJobQueue = new LinkedList<>();
+    private final Set<BaseAnalysisTask> systemJobSet = new HashSet<>();
 
-    private final Set<AnalysisJob> systemJobSet = new HashSet<>();
+    private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
 
-    private final Set<AnalysisJob> manualJobSet = new HashSet<>();
-
-    private final AnalysisJobExecutor jobExecutor = new 
AnalysisJobExecutor(this);
-
-    {
-        jobExecutor.start();
-    }
-
-    public void updateJobStatus(long jobId, JobState jobState, String message, 
long time) {
-        Map<String, String> params = new HashMap<>();
-        params.put("jobState", jobState.toString());
-        params.put("message", StringUtils.isNotEmpty(message) ? 
String.format(", message = '%s'", message) : "");
-        params.put("updateExecTime", time == -1 ? "" : ", 
last_exec_time_in_ms=" + time);
-        params.put("jobId", String.valueOf(jobId));
-        try {
-            StatisticsUtil.execUpdate(new 
StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
-        } catch (Exception e) {
-            LOG.warn(String.format("Failed to update state for job: %s", 
jobId), e);
-        }
-
-    }
-
-    public synchronized void scheduleJobs(List<AnalysisJobInfo> 
analysisJobInfos) {
-        for (AnalysisJobInfo job : analysisJobInfos) {
+    public synchronized void scheduleJobs(List<AnalysisTaskInfo> 
analysisJobInfos) {
+        for (AnalysisTaskInfo job : analysisJobInfos) {
             schedule(job);
         }
     }
 
-    public synchronized void schedule(AnalysisJobInfo analysisJobInfo) {
+    public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) {
         CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName);
         Preconditions.checkArgument(catalog != null);
         DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName);
         Preconditions.checkArgument(db != null);
         TableIf table = db.getTableNullable(analysisJobInfo.tblName);
         Preconditions.checkArgument(table != null);
-        AnalysisJob analysisJob = table.createAnalysisJob(this, 
analysisJobInfo);
-        addToManualJobQueue(analysisJob);
+        BaseAnalysisTask analysisTask = table.createAnalysisTask(this, 
analysisJobInfo);
+        addToManualJobQueue(analysisTask);
         if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
             return;
         }
-        addToSystemQueue(analysisJob);
+        addToSystemQueue(analysisTask);
     }
 
-    private void removeFromSystemQueue(AnalysisJob analysisJobInfo) {
+    private void removeFromSystemQueue(BaseAnalysisTask analysisJobInfo) {
         if (manualJobSet.contains(analysisJobInfo)) {
             systemJobQueue.remove(analysisJobInfo);
             manualJobSet.remove(analysisJobInfo);
         }
     }
 
-    private void addToSystemQueue(AnalysisJob analysisJobInfo) {
+    private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) {
         if (systemJobSet.contains(analysisJobInfo)) {
             return;
         }
@@ -116,7 +85,7 @@ public class AnalysisJobScheduler {
         notify();
     }
 
-    private void addToManualJobQueue(AnalysisJob analysisJobInfo) {
+    private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) {
         if (manualJobSet.contains(analysisJobInfo)) {
             return;
         }
@@ -125,7 +94,7 @@ public class AnalysisJobScheduler {
         notify();
     }
 
-    public synchronized AnalysisJob getPendingJobs() {
+    public synchronized BaseAnalysisTask getPendingTasks() {
         while (true) {
             if (!manualJobQueue.isEmpty()) {
                 return manualJobQueue.poll();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
similarity index 63%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
index 244a53b187..3ca55dbd9e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
@@ -18,60 +18,66 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.statistics.AnalysisJobInfo.JobState;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.concurrent.FutureTask;
 
-public class AnalysisJobWrapper extends FutureTask<Void> {
+public class AnalysisTaskWrapper extends FutureTask<Void> {
 
-    private static final Logger LOG = 
LogManager.getLogger(AnalysisJobWrapper.class);
+    private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskWrapper.class);
 
-    private final AnalysisJob job;
+    private final BaseAnalysisTask task;
 
     private long startTime;
 
-    private final AnalysisJobExecutor executor;
+    private final AnalysisTaskExecutor executor;
 
-    public AnalysisJobWrapper(AnalysisJobExecutor executor, AnalysisJob job) {
+    public AnalysisTaskWrapper(AnalysisTaskExecutor executor, BaseAnalysisTask 
job) {
         super(() -> {
             job.execute();
             return null;
         });
         this.executor = executor;
-        this.job = job;
+        this.task = job;
     }
 
     @Override
     public void run() {
         startTime = System.currentTimeMillis();
-        Exception except = null;
+        Throwable except = null;
         try {
             executor.putJob(this);
             super.run();
+            Object result = get();
+            if (result instanceof Throwable) {
+                except = (Throwable) result;
+            }
         } catch (Exception e) {
             except = e;
         } finally {
             executor.decr();
             if (except != null) {
-                Env.getCurrentEnv().getAnalysisJobScheduler()
-                        .updateJobStatus(job.getJobId(), JobState.FAILED, 
except.getMessage(), -1);
+                LOG.warn("Failed to execute task", except);
+                Env.getCurrentEnv().getAnalysisManager()
+                        .updateTaskStatus(task.info,
+                                AnalysisState.FAILED, except.getMessage(), -1);
             } else {
-                Env.getCurrentEnv().getAnalysisJobScheduler()
-                        .updateJobStatus(job.getJobId(), JobState.FINISHED, 
"", System.currentTimeMillis());
+                Env.getCurrentEnv().getAnalysisManager()
+                        .updateTaskStatus(task.info,
+                                AnalysisState.FINISHED, "", 
System.currentTimeMillis());
             }
-            LOG.warn("{} finished, cost time:{}", job.toString(), 
System.currentTimeMillis() - startTime);
+            LOG.warn("{} finished, cost time:{}", task.toString(), 
System.currentTimeMillis() - startTime);
         }
     }
 
     public boolean cancel() {
         try {
-            LOG.warn("{} cancelled, cost time:{}", job.toString(), 
System.currentTimeMillis() - startTime);
-            job.cancel();
+            LOG.warn("{} cancelled, cost time:{}", task.toString(), 
System.currentTimeMillis() - startTime);
+            task.cancel();
         } catch (Exception e) {
-            LOG.warn(String.format("Cancel job failed job info : %s", 
job.toString()));
+            LOG.warn(String.format("Cancel job failed job info : %s", 
task.toString()));
         } finally {
             executor.decr();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
new file mode 100644
index 0000000000..79b0ec8c16
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class BaseAnalysisTask {
+
+    protected static final String INSERT_PART_STATISTICS = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " SELECT "
+            + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
+            + "${catalogId} AS catalog_id, "
+            + "${dbId} AS db_id, "
+            + "${tblId} AS tbl_id, "
+            + "'${colId}' AS col_id, "
+            + "${partId} AS part_id, "
+            + "COUNT(1) AS row_count, "
+            + "NDV(`${colName}`) AS ndv, "
+            + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
+            + "MIN(`${colName}`) AS min, "
+            + "MAX(`${colName}`) AS max, "
+            + "${dataSizeFunction} AS data_size, "
+            + "NOW() ";
+
+    protected static final String INSERT_COL_STATISTICS = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + "    SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, 
row_count, "
+            + "        ndv, null_count, min, max, data_size, update_time\n"
+            + "    FROM \n"
+            + "     (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, "
+            + "         ${catalogId} AS catalog_id, "
+            + "         ${dbId} AS db_id, "
+            + "         ${tblId} AS tbl_id, "
+            + "         '${colId}' AS col_id, "
+            + "         NULL AS part_id, "
+            + "         SUM(count) AS row_count, \n"
+            + "         SUM(null_count) AS null_count, "
+            + "         MIN(CAST(min AS ${type})) AS min, "
+            + "         MAX(CAST(max AS ${type})) AS max, "
+            + "         SUM(data_size_in_bytes) AS data_size, "
+            + "         NOW() AS update_time\n"
+            + "     FROM ${internalDB}.${columnStatTbl}"
+            + "     WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND 
"
+            + "     ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
+            + "     ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
+            + "     ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
+            + "     ) t1, \n";
+
+    protected AnalysisTaskScheduler analysisTaskScheduler;
+
+    protected AnalysisTaskInfo info;
+
+    protected CatalogIf catalog;
+
+    protected DatabaseIf db;
+
+    protected TableIf tbl;
+
+    protected Column col;
+
+    protected StmtExecutor stmtExecutor;
+
+    protected AnalysisState analysisState;
+
+    @VisibleForTesting
+    public BaseAnalysisTask() {
+
+    }
+
+    public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, 
AnalysisTaskInfo info) {
+        this.analysisTaskScheduler = analysisTaskScheduler;
+        this.info = info;
+        init(info);
+    }
+
+    private void init(AnalysisTaskInfo info) {
+        catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
+        if (catalog == null) {
+            Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, 
AnalysisState.FAILED,
+                    String.format("Catalog with name: %s not exists", 
info.dbName), System.currentTimeMillis());
+            return;
+        }
+        db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null);
+        if (db == null) {
+            Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, 
AnalysisState.FAILED,
+                    String.format("DB with name %s not exists", info.dbName), 
System.currentTimeMillis());
+            return;
+        }
+        tbl = (TableIf) db.getTable(info.tblName).orElse(null);
+        if (tbl == null) {
+            Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
+                    info, AnalysisState.FAILED,
+                    String.format("Table with name %s not exists", 
info.tblName), System.currentTimeMillis());
+        }
+        if (info.analysisType != null && 
info.analysisType.equals(AnalysisType.COLUMN)) {
+            col = tbl.getColumn(info.colName);
+            if (col == null) {
+                Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
+                        info, AnalysisState.FAILED, String.format("Column with 
name %s not exists", info.tblName),
+                        System.currentTimeMillis());
+            }
+        }
+
+    }
+
+    public abstract void execute() throws Exception;
+
+    public void cancel() {
+        if (stmtExecutor != null) {
+            stmtExecutor.cancel();
+        }
+        Env.getCurrentEnv().getAnalysisManager()
+                .updateTaskStatus(info, AnalysisState.FAILED,
+                        String.format("Job has been cancelled: %s", 
info.toString()), -1);
+    }
+
+    public int getLastExecTime() {
+        return info.lastExecTimeInMs;
+    }
+
+    public long getJobId() {
+        return info.jobId;
+    }
+
+    public AnalysisState getAnalysisState() {
+        return analysisState;
+    }
+
+    protected String getDataSizeFunction(Column column) {
+        if (column.getType().isStringType()) {
+            return "SUM(LENGTH(`${colName}`))";
+        }
+        return "COUNT(1) * " + column.getType().getSlotSize();
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
similarity index 89%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index c92d92dac9..9a6dc93564 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -22,12 +22,12 @@ import org.apache.doris.common.Config;
 
 import org.apache.commons.lang.NotImplementedException;
 
-public class HMSAnalysisJob extends AnalysisJob {
+public class HMSAnalysisTask extends BaseAnalysisTask {
 
     protected HMSExternalTable table;
 
-    public HMSAnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
-        super(analysisJobScheduler, info);
+    public HMSAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, 
AnalysisTaskInfo info) {
+        super(analysisTaskScheduler, info);
         table = (HMSExternalTable) tbl;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
similarity index 97%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
index 216cafcb89..7bba21c692 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
@@ -43,16 +43,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class HiveAnalysisJob extends HMSAnalysisJob {
-    private static final Logger LOG = 
LogManager.getLogger(HiveAnalysisJob.class);
+public class HiveAnalysisTask extends HMSAnalysisTask {
+    private static final Logger LOG = 
LogManager.getLogger(HiveAnalysisTask.class);
 
     public static final String TOTAL_SIZE = "totalSize";
     public static final String NUM_ROWS = "numRows";
     public static final String NUM_FILES = "numFiles";
     public static final String TIMESTAMP = "transient_lastDdlTime";
 
-    public HiveAnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
-        super(analysisJobScheduler, info);
+    public HiveAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, 
AnalysisTaskInfo info) {
+        super(analysisTaskScheduler, info);
     }
 
     private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
similarity index 96%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
index a62052f8bf..b51fa4eb53 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
@@ -38,14 +38,14 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-public class IcebergAnalysisJob extends HMSAnalysisJob {
+public class IcebergAnalysisTask extends HMSAnalysisTask {
 
     private long numRows = 0;
     private long dataSize = 0;
     private long numNulls = 0;
 
-    public IcebergAnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
-        super(analysisJobScheduler, info);
+    public IcebergAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, 
AnalysisTaskInfo info) {
+        super(analysisTaskScheduler, info);
     }
 
     private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
new file mode 100644
index 0000000000..70f4d0c5f3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.analysis.SelectListItem;
+import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.google.common.base.Preconditions;
+
+import java.io.StringReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Analysis for the materialized view, only gets constructed when the 
AnalyzeStmt is not set which
+ * columns to be analyzed.
+ * TODO: Supports multi-table mv
+ */
+public class MVAnalysisTask extends BaseAnalysisTask {
+
+    private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS
+            + " FROM (${sql}) mv";
+
+    private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS
+            + "     (SELECT NDV(`${colName}`) AS ndv "
+            + "     FROM (${sql}) mv) t2\n";
+
+    private MaterializedIndexMeta meta;
+
+    private SelectStmt selectStmt;
+
+    private OlapTable olapTable;
+
+    public MVAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, 
AnalysisTaskInfo info) {
+        super(analysisTaskScheduler, info);
+        init();
+    }
+
+    private void init() {
+        olapTable = (OlapTable) tbl;
+        meta = olapTable.getIndexMetaByIndexId(info.indexId);
+        Preconditions.checkState(meta != null);
+        String mvDef = meta.getDefineStmt().originStmt;
+        SqlScanner input =
+                new SqlScanner(new StringReader(mvDef), 0L);
+        SqlParser parser = new SqlParser(input);
+        CreateMaterializedViewStmt cmv = null;
+        try {
+            cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 
0);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        selectStmt = cmv.getSelectStmt();
+        selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName());
+    }
+
+    @Override
+    public void execute() throws Exception {
+        for (Column column : meta.getSchema()) {
+            SelectStmt selectOne = (SelectStmt) selectStmt.clone();
+            TableRef tableRef = selectOne.getTableRefs().get(0);
+            SelectListItem selectItem = selectOne.getSelectList().getItems()
+                    .stream()
+                    .filter(i -> isCorrespondingToColumn(i, column))
+                    .findFirst()
+                    .get();
+            selectItem.setAlias(column.getName());
+            Map<String, String> params = new HashMap<>();
+            for (Partition part : olapTable.getAllPartitions()) {
+                String partName = part.getName();
+                PartitionNames partitionName = new PartitionNames(false, 
Arrays.asList(partName));
+                tableRef.setPartitionNames(partitionName);
+                String sql = selectOne.toSql();
+                params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+                params.put("columnStatTbl", 
StatisticConstants.STATISTIC_TBL_NAME);
+                params.put("catalogId", String.valueOf(catalog.getId()));
+                params.put("dbId", String.valueOf(db.getId()));
+                params.put("tblId", String.valueOf(meta.getIndexId()));
+                String colName = column.getName();
+                params.put("colId", colName);
+                long partId = part.getId();
+                params.put("partId", String.valueOf(partId));
+                params.put("dataSizeFunction", getDataSizeFunction(column));
+                params.put("dbName", info.dbName);
+                params.put("colName", colName);
+                params.put("tblName", String.valueOf(info.tblName));
+                params.put("sql", sql);
+                StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
+            }
+            params.remove("partId");
+            params.put("type", column.getType().toString());
+            StatisticsUtil.execUpdate(ANALYZE_MV_COL, params);
+            
Env.getCurrentEnv().getStatisticsCache().refreshSync(meta.getIndexId(), 
column.getName());
+        }
+    }
+
+    //  Based on the fact that materialized view create statement's select 
expr only contains basic SlotRef and
+    //  AggregateFunction.
+    private boolean isCorrespondingToColumn(SelectListItem item, Column 
column) {
+        Expr expr = item.getExpr();
+        if (expr instanceof SlotRef) {
+            SlotRef slotRef = (SlotRef) expr;
+            return slotRef.getColumnName().equalsIgnoreCase(column.getName());
+        }
+        if (expr instanceof FunctionCallExpr) {
+            FunctionCallExpr func = (FunctionCallExpr) expr;
+            SlotRef slotRef = (SlotRef) func.getChild(0);
+            return slotRef.getColumnName().equalsIgnoreCase(column.getName());
+        }
+        return false;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
new file mode 100644
index 0000000000..23ea5ea6a6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.text.StringSubstitutor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Each task analyze one column.
+ */
+public class OlapAnalysisTask extends BaseAnalysisTask {
+
+    private static final String ANALYZE_PARTITION_SQL_TEMPLATE = 
INSERT_PART_STATISTICS
+            + "FROM `${dbName}`.`${tblName}` "
+            + "PARTITION ${partName}";
+
+    private static final String ANALYZE_COLUMN_SQL_TEMPLATE = 
INSERT_COL_STATISTICS
+            + "     (SELECT NDV(`${colName}`) AS ndv "
+            + "     FROM `${dbName}`.`${tblName}`) t2\n";
+
+    @VisibleForTesting
+    public OlapAnalysisTask() {
+        super();
+    }
+
+    public OlapAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, 
AnalysisTaskInfo info) {
+        super(analysisTaskScheduler, info);
+    }
+
+    public void execute() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+        params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+        params.put("catalogId", String.valueOf(catalog.getId()));
+        params.put("dbId", String.valueOf(db.getId()));
+        params.put("tblId", String.valueOf(tbl.getId()));
+        params.put("colId", String.valueOf(info.colName));
+        params.put("dataSizeFunction", getDataSizeFunction(col));
+        params.put("dbName", info.dbName);
+        params.put("colName", String.valueOf(info.colName));
+        params.put("tblName", String.valueOf(info.tblName));
+        List<String> partitionAnalysisSQLs = new ArrayList<>();
+        try {
+            tbl.readLock();
+            Set<String> partNames = tbl.getPartitionNames();
+            for (String partName : partNames) {
+                Partition part = tbl.getPartition(partName);
+                if (part == null) {
+                    continue;
+                }
+                params.put("partId", 
String.valueOf(tbl.getPartition(partName).getId()));
+                params.put("partName", String.valueOf(partName));
+                StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
+                
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
+            }
+        } finally {
+            tbl.readUnlock();
+        }
+        execSQLs(partitionAnalysisSQLs);
+        params.remove("partId");
+        params.put("type", col.getType().toString());
+        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+        String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
+        execSQL(sql);
+        Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), 
col.getName());
+    }
+
+    @VisibleForTesting
+    public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
+        for (String sql : partitionAnalysisSQLs) {
+            execSQL(sql);
+        }
+    }
+
+    @VisibleForTesting
+    public void execSQL(String sql) throws Exception {
+        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
+            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+            this.stmtExecutor.execute();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
index c369da43b9..62cc5638ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -260,21 +259,6 @@ public class StatisticsJob {
         }
     }
 
-    /**
-     * get statisticsJob from analyzeStmt.
-     * AnalyzeStmt: analyze t1(c1,c2,c3)
-     * tableId: [t1]
-     * tableIdToColumnName: {t1: [c1,c2,c3]}
-     */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt stmt) throws 
AnalysisException {
-        long dbId = stmt.getDbId();
-        Set<Long> tblIds = stmt.getTblIds();
-        Map<Long, List<String>> tableIdToPartitionName = 
stmt.getTableIdToPartitionName();
-        Map<Long, List<String>> tableIdToColumnName = 
stmt.getTableIdToColumnName();
-        Map<String, String> properties = stmt.getProperties();
-        return new StatisticsJob(dbId, tblIds, tableIdToPartitionName, 
tableIdToColumnName, properties);
-    }
-
     public List<Comparable> getShowInfo(@Nullable Long tableId) throws 
AnalysisException {
         List<Comparable> result = Lists.newArrayList();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
index 37966dc1fe..dbdc202f0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
@@ -17,20 +17,16 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.analysis.ShowAnalyzeStmt;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.OrderByPair;
-import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -61,37 +57,6 @@ public class StatisticsJobManager {
         return idToStatisticsJob;
     }
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws 
UserException {
-        // The current statistics are only used for CBO test,
-        // and are not available to users. (work in progress)
-        // TODO(wzt): Further tests are needed
-        boolean enableCboStatistics = ConnectContext.get()
-                .getSessionVariable().getEnableCboStatistics();
-        if (enableCboStatistics) {
-            // step1: init statistics job by analyzeStmt
-            StatisticsJob statisticsJob = 
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-            synchronized (this) {
-                // step2: check restrict
-                checkRestrict(analyzeStmt.getDbId(), 
statisticsJob.getTblIds());
-                // step3: create it
-                createStatisticsJob(statisticsJob);
-            }
-        } else {
-            throw new UserException("Statistics are not yet stable, if you 
want to enable statistics,"
-                    + " use 'set enable_cbo_statistics=true' to enable it.");
-        }
-    }
-
-    public void createStatisticsJob(StatisticsJob statisticsJob) throws 
DdlException {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
-        try {
-            
Env.getCurrentEnv().getStatisticsJobScheduler().addPendingJob(statisticsJob);
-        } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it 
again later.");
-            throw new DdlException("The pending statistics job is full, Please 
submit it again later.");
-        }
-    }
-
     /**
      * The statistical job has the following restrict:
      * - Rule1: The same table cannot have two unfinished statistics jobs
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index d1b9c8f4f4..715ec14464 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -18,16 +18,12 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AlterColumnStatsStmt;
-import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.statistics.AnalysisJobInfo.AnalysisType;
-import org.apache.doris.statistics.AnalysisJobInfo.JobState;
-import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
 import org.apache.doris.statistics.util.DBObjects;
 import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
 import org.apache.doris.statistics.util.StatisticsUtil;
@@ -71,8 +67,9 @@ public class StatisticsRepository {
             + " WHERE `id` IN (${idList})";
 
     private static final String PERSIST_ANALYSIS_JOB_SQL_TEMPLATE = "INSERT 
INTO "
-            + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, 
'${catalogName}', '${dbName}',"
-            + "'${tblName}','${colName}', '${jobType}', '${analysisType}', 
'${message}', '${lastExecTimeInMs}',"
+            + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, 
${taskId}, '${catalogName}', '${dbName}',"
+            + "'${tblName}','${colName}', ,'${indexId}','${jobType}', 
'${analysisType}', "
+            + "'${message}', '${lastExecTimeInMs}',"
             + "'${state}', '${scheduleType}')";
 
     private static final String INSERT_INTO_COLUMN_STATISTICS = "INSERT INTO "
@@ -134,39 +131,23 @@ public class StatisticsRepository {
         return stringJoiner.toString();
     }
 
-    public static void createAnalysisJob(AnalyzeStmt analyzeStmt) {
-        String catalogName = analyzeStmt.getCatalogName();
-        String db = analyzeStmt.getDBName();
-        String tbl = analyzeStmt.getTblName();
-        List<String> colNames = analyzeStmt.getOptColumnNames();
-
-        if (colNames != null) {
-            for (String colName : colNames) {
-                AnalysisJobInfo analysisJobInfo = new 
AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db,
-                        tbl, colName, AnalysisJobInfo.JobType.MANUAL, 
ScheduleType.ONCE);
-                analysisJobInfo.analysisType = AnalysisType.FULL;
-                Map<String, String> params = new HashMap<>();
-                params.put("jobId", String.valueOf(analysisJobInfo.jobId));
-                params.put("catalogName", analysisJobInfo.catalogName);
-                params.put("dbName", analysisJobInfo.dbName);
-                params.put("tblName", analysisJobInfo.tblName);
-                params.put("colName", analysisJobInfo.colName);
-                params.put("jobType", analysisJobInfo.jobType.toString());
-                params.put("analysisType", 
analysisJobInfo.analysisType.toString());
-                params.put("message", "");
-                params.put("lastExecTimeInMs", "0");
-                params.put("state", JobState.PENDING.toString());
-                params.put("scheduleType", 
analysisJobInfo.scheduleType.toString());
-                try {
-                    StatisticsUtil.execUpdate(
-                            new 
StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE));
-                } catch (Exception e) {
-                    LOG.warn("Failed to persite job for column: {}", colName, 
e);
-                    return;
-                }
-                
Env.getCurrentEnv().getAnalysisJobScheduler().schedule(analysisJobInfo);
-            }
-        }
+    public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) 
throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("jobId", String.valueOf(analysisTaskInfo.jobId));
+        params.put("taskId", String.valueOf(analysisTaskInfo.taskId));
+        params.put("catalogName", analysisTaskInfo.catalogName);
+        params.put("dbName", analysisTaskInfo.dbName);
+        params.put("tblName", analysisTaskInfo.tblName);
+        params.put("colName", analysisTaskInfo.colName);
+        params.put("indexId", String.valueOf(analysisTaskInfo.indexId));
+        params.put("jobType", analysisTaskInfo.jobType.toString());
+        params.put("analysisType", analysisTaskInfo.analysisMethod.toString());
+        params.put("message", "");
+        params.put("lastExecTimeInMs", "0");
+        params.put("state", AnalysisState.PENDING.toString());
+        params.put("scheduleType", analysisTaskInfo.scheduleType.toString());
+        StatisticsUtil.execUpdate(
+                new 
StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE));
     }
 
     public static void alterColumnStatistics(AlterColumnStatsStmt 
alterColumnStatsStmt) throws Exception {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index baa5ff1395..16a4a66ffc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.statistics.util;
 
-
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BoolLiteral;
 import org.apache.doris.analysis.DateLiteral;
@@ -46,7 +45,7 @@ import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.AnalysisJobInfo;
+import org.apache.doris.statistics.AnalysisTaskInfo;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.StatisticConstants;
 import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
@@ -90,13 +89,11 @@ public class StatisticsUtil {
             StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, 
sql);
             r.connectContext.setExecutor(stmtExecutor);
             stmtExecutor.execute();
-        } finally {
-            ConnectContext.remove();
         }
     }
 
     // TODO: finish this.
-    public static List<AnalysisJobInfo> 
deserializeToAnalysisJob(List<ResultRow> resultBatches) throws TException {
+    public static List<AnalysisTaskInfo> 
deserializeToAnalysisJob(List<ResultRow> resultBatches) throws TException {
         return new ArrayList<>();
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java
index 019d4d0d90..9e0b8c2a46 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java
@@ -41,7 +41,7 @@ public class SqlModeTest {
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
-        Assert.assertEquals("SELECT  FROM `db1`.`tbl1` WHERE `name` = 'BILL 
GATES'", selectStmt.toSql());
+        Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL 
GATES'", selectStmt.toSql());
 
         parser = new SqlParser(new SqlScanner(new StringReader(stmt), 
SqlModeHelper.MODE_DEFAULT));
         try {
@@ -49,7 +49,7 @@ public class SqlModeTest {
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
-        Assert.assertEquals("SELECT  FROM `db1`.`tbl1` WHERE `name` = 'BILL 
GATES'", selectStmt.toSql());
+        Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL 
GATES'", selectStmt.toSql());
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
index e2cc43a617..658b9d8ba1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
@@ -28,7 +28,6 @@ import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.InternalSchemaInitializer;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
@@ -96,7 +95,6 @@ public class TabletRepairAndBalanceTest {
 
     static {
         try {
-            InternalSchemaInitializer.forTest = true;
             tag1 = Tag.create(Tag.TYPE_LOCATION, "zone1");
             tag2 = Tag.create(Tag.TYPE_LOCATION, "zone2");
         } catch (AnalysisException e) {
@@ -106,6 +104,7 @@ public class TabletRepairAndBalanceTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
         System.out.println(runningDir);
         FeConstants.runningUnitTest = true;
         FeConstants.tablet_checker_interval_ms = 1000;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
index f7b5e0eda3..8d90e1a82e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
@@ -21,7 +21,6 @@ import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.InternalSchemaInitializer;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
@@ -72,7 +71,7 @@ public class TabletReplicaTooSlowTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
-        InternalSchemaInitializer.forTest = true;
+        FeConstants.runningUnitTest = true;
         System.out.println(runningDir);
         FeConstants.runningUnitTest = true;
         FeConstants.tablet_checker_interval_ms = 1000;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 25960fa5d4..95fb22eac6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -27,6 +27,7 @@ import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
@@ -38,6 +39,16 @@ public class DecommissionBackendTest extends 
TestWithFeService {
         return 3;
     }
 
+    @Override
+    protected void beforeCluster() {
+        FeConstants.runningUnitTest = true;
+    }
+
+    @BeforeAll
+    public void beforeClass() {
+        FeConstants.runningUnitTest = true;
+    }
+
     @Override
     protected void beforeCreatingConnectContext() throws Exception {
         FeConstants.default_scheduler_interval_millisecond = 1000;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
index 691da60173..0c43ee6338 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java
@@ -17,9 +17,18 @@
 
 package org.apache.doris.nereids.datasets.ssb;
 
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
 
+import org.junit.jupiter.api.BeforeAll;
+
 public abstract class SSBTestBase extends AnalyzeCheckTestBase {
+
+    @BeforeAll
+    public void beforeClass() {
+        FeConstants.runningUnitTest = true;
+    }
+
     @Override
     protected void runBeforeAll() throws Exception {
         createDatabase("test");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java
deleted file mode 100644
index e16f416368..0000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.InternalSchemaInitializer;
-import org.apache.doris.common.jmockit.Deencapsulation;
-import org.apache.doris.statistics.AnalysisJobInfo.JobType;
-import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
-import org.apache.doris.statistics.util.BlockingCounter;
-import org.apache.doris.utframe.TestWithFeService;
-
-import mockit.Expectations;
-import mockit.Mocked;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.BlockingQueue;
-
-public class AnalysisJobExecutorTest extends TestWithFeService {
-
-    @Mocked
-    AnalysisJobScheduler analysisJobScheduler;
-
-    @Override
-    protected void runBeforeAll() throws Exception {
-        try {
-            InternalSchemaInitializer.createDB();
-            createDatabase("analysis_job_test");
-            connectContext.setDatabase("default_cluster:analysis_job_test");
-            createTable("CREATE TABLE t1 (col1 int not null, col2 int not 
null, col3 int not null)\n"
-
-                    + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
-                    + "PROPERTIES(\n" + "    \"replication_num\"=\"1\"\n"
-                    + ");");
-            InternalSchemaInitializer storageInitializer = new 
InternalSchemaInitializer();
-            
Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Test
-    public void testExpiredJobCancellation() throws Exception {
-        AnalysisJobExecutor analysisJobExecutor = new 
AnalysisJobExecutor(analysisJobScheduler);
-        BlockingQueue<AnalysisJobWrapper> b = 
Deencapsulation.getField(analysisJobExecutor, "jobQueue");
-        AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0,
-                "internal",
-                "default_cluster:analysis_job_test",
-                "t1",
-                "col1", JobType.MANUAL,
-                ScheduleType.ONCE);
-        AnalysisJob analysisJob = new AnalysisJob(analysisJobScheduler, 
analysisJobInfo);
-        AnalysisJobWrapper analysisJobWrapper = new 
AnalysisJobWrapper(analysisJobExecutor, analysisJob);
-        Deencapsulation.setField(analysisJobWrapper, "startTime", 5);
-        b.put(analysisJobWrapper);
-        new Expectations() {
-            {
-                analysisJobWrapper.cancel();
-                times = 1;
-            }
-        };
-        analysisJobExecutor.start();
-        BlockingCounter counter = 
Deencapsulation.getField(analysisJobExecutor, "blockingCounter");
-        Assertions.assertEquals(0, counter.getVal());
-    }
-
-    @Test
-    public void testJobExecution() throws Exception {
-        AnalysisJobExecutor analysisJobExecutor = new 
AnalysisJobExecutor(analysisJobScheduler);
-        AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0,
-                "internal",
-                "default_cluster:analysis_job_test",
-                "t1",
-                "col1", JobType.MANUAL,
-                ScheduleType.ONCE);
-        AnalysisJob job = new AnalysisJob(analysisJobScheduler, 
analysisJobInfo);
-        new Expectations() {
-            {
-                analysisJobScheduler.getPendingJobs();
-                result = job;
-                job.execute();
-                times = 1;
-            }
-        };
-        Deencapsulation.invoke(analysisJobExecutor, "doFetchAndExecute");
-    }
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index dfe7617b60..a01dd8f7cf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -22,8 +22,9 @@ import org.apache.doris.catalog.InternalSchemaInitializer;
 import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.AnalysisJobInfo.JobType;
-import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
 import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.utframe.TestWithFeService;
 
@@ -54,10 +55,10 @@ public class AnalysisJobTest extends TestWithFeService {
     }
 
     @Test
-    public void testCreateAnalysisJob(@Mocked AnalysisJobScheduler scheduler) 
throws Exception {
+    public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) 
throws Exception {
         new Expectations() {
             {
-                scheduler.schedule((AnalysisJobInfo) any);
+                scheduler.schedule((AnalysisTaskInfo) any);
                 times = 3;
             }
         };
@@ -86,7 +87,7 @@ public class AnalysisJobTest extends TestWithFeService {
     }
 
     @Test
-    public void testJobExecution(@Mocked AnalysisJobScheduler scheduler, 
@Mocked StmtExecutor stmtExecutor)
+    public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler, 
@Mocked StmtExecutor stmtExecutor)
             throws Exception {
         new MockUp<StatisticsUtil>() {
 
@@ -105,13 +106,12 @@ public class AnalysisJobTest extends TestWithFeService {
                 times = 2;
             }
         };
-        AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0,
-                "internal",
-                "default_cluster:analysis_job_test",
-                "t1",
-                "col1", JobType.MANUAL,
-                ScheduleType.ONCE);
-        new AnalysisJob(scheduler, analysisJobInfo).execute();
+        AnalysisTaskInfo analysisJobInfo = new 
AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0)
+                
.setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1")
+                
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
+                        AnalysisType.COLUMN)
+                .build();
+        new OlapAnalysisTask(scheduler, analysisJobInfo).execute();
     }
 
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
new file mode 100644
index 0000000000..f9fcbaf55f
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.InternalSchemaInitializer;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
+import org.apache.doris.statistics.util.BlockingCounter;
+import org.apache.doris.utframe.TestWithFeService;
+
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+public class AnalysisTaskExecutorTest extends TestWithFeService {
+
+    @Mocked
+    AnalysisTaskScheduler analysisTaskScheduler;
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        try {
+            InternalSchemaInitializer.createDB();
+            createDatabase("analysis_job_test");
+            connectContext.setDatabase("default_cluster:analysis_job_test");
+            createTable("CREATE TABLE t1 (col1 int not null, col2 int not 
null, col3 int not null)\n"
+
+                    + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
+                    + "PROPERTIES(\n" + "    \"replication_num\"=\"1\"\n"
+                    + ");");
+            InternalSchemaInitializer storageInitializer = new 
InternalSchemaInitializer();
+            
Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testExpiredJobCancellation() throws Exception {
+        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(analysisTaskScheduler);
+        BlockingQueue<AnalysisTaskWrapper> b = 
Deencapsulation.getField(analysisTaskExecutor, "jobQueue");
+        AnalysisTaskInfo analysisJobInfo = new 
AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0)
+                
.setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1")
+                
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
+                        AnalysisType.COLUMN)
+                .build();
+        OlapAnalysisTask analysisJob = new 
OlapAnalysisTask(analysisTaskScheduler, analysisJobInfo);
+        AnalysisTaskWrapper analysisTaskWrapper = new 
AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
+        Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
+        b.put(analysisTaskWrapper);
+        new Expectations() {
+            {
+                analysisTaskWrapper.cancel();
+                times = 1;
+            }
+        };
+        analysisTaskExecutor.start();
+        BlockingCounter counter = 
Deencapsulation.getField(analysisTaskExecutor, "blockingCounter");
+        Assertions.assertEquals(0, counter.getVal());
+    }
+
+    @Test
+    public void testTaskExecution() throws Exception {
+        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(analysisTaskScheduler);
+        AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0)
+                
.setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1")
+                
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
+                        AnalysisType.COLUMN)
+                .build();
+        OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskScheduler, 
analysisTaskInfo);
+        new MockUp<AnalysisTaskScheduler>() {
+            @Mock
+            public synchronized BaseAnalysisTask getPendingTasks() {
+                return task;
+            }
+        };
+        new MockUp<AnalysisManager>() {
+            @Mock
+            public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState 
jobState, String message, long time) {}
+        };
+        new Expectations() {
+            {
+                task.execute();
+                times = 1;
+            }
+        };
+        Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute");
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java
new file mode 100644
index 0000000000..7188342b83
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.utframe.TestWithFeService;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Tested;
+import org.junit.jupiter.api.Test;
+
+public class MVStatisticsTest extends TestWithFeService {
+
+    @Injectable
+    StatisticsCache statisticsCache;
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        connectContext.setDatabase(SystemInfoService.DEFAULT_CLUSTER + ":" + 
"test");
+        createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, 
col3 int not null)\n"
+                + "DISTRIBUTED BY HASH(col3)\n"
+                + "BUCKETS 1\n"
+                + "PROPERTIES(\n"
+                + "    \"replication_num\"=\"1\"\n"
+                + ");\n");
+        createMv("CREATE MATERIALIZED VIEW mv1 AS SELECT col3 , SUM(COL2) FROM 
t1 group by col3");
+    }
+
+    @Tested
+
+    @Test
+    public void testCreate() throws Exception {
+        new Expectations() {
+            {
+                statisticsCache.refreshSync(anyLong, anyString);
+                times = 5;
+            }
+        };
+        new MockUp<StatisticsRepository>() {
+        };
+        new MockUp<StatisticsUtil>() {
+
+            @Mock
+            public void execUpdate(String sql) throws Exception {}
+        };
+        new MockUp<OlapAnalysisTask>(OlapAnalysisTask.class) {
+
+            @Mock
+            public void execSQL(String sql) throws Exception {}
+        };
+        new MockUp<Env>() {
+
+            @Mock
+            public StatisticsCache getStatisticsCache() {
+                return statisticsCache;
+            }
+        };
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+        Deencapsulation.setField(analysisManager, "statisticsCache", 
statisticsCache);
+        getSqlStmtExecutor("analyze t1");
+        Thread.sleep(3000);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 90b0e4658d..5ecce69719 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -41,7 +41,6 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.InternalSchemaInitializer;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Table;
@@ -121,13 +120,16 @@ public abstract class TestWithFeService {
 
     @BeforeAll
     public final void beforeAll() throws Exception {
-        InternalSchemaInitializer.forTest = true;
         beforeCreatingConnectContext();
         connectContext = createDefaultCtx();
+        beforeCluster();
         createDorisCluster();
         runBeforeAll();
     }
 
+    protected void beforeCluster() {
+    }
+
     @AfterAll
     public final void afterAll() throws Exception {
         runAfterAll();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to