This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dfbeeccd47 [feature-wip](statistics) step2: schedule the statistics 
job and generate executable tasks (#8859)
dfbeeccd47 is described below

commit dfbeeccd4795b3d4fe26a60b288c9aee85187cf9
Author: ElvinWei <[email protected]>
AuthorDate: Wed Apr 27 11:05:43 2022 +0800

    [feature-wip](statistics) step2: schedule the statistics job and generate 
executable tasks (#8859)
    
    This pull request includes some implementations of the 
statistics(https://github.com/apache/incubator-doris/issues/6370), it will not 
affect any existing code and users will not be able to create statistics job.
    
    After receiving the statistics collection statement, it generates a job. 
Here it implements the division of statistics collection jobs according to the 
following statistics categories:
    table:
    - `row_count`: table row count are critical in estimating cardinality and 
memory usage of scan nodes.
    - `data_size`: table size, not applicable to CBO, mainly used to monitor 
and manage table size.
    column:
    - `num_distinct_value`: used to determine the selectivity of an equivalent 
expression.
    - `min`: The minimum value.
    - `max`: The maximum value.
    - `num_nulls`: number of nulls.
    - `avg_col_len`: the average length of a column, in bytes, is used for 
memory and network IO evaluation.
    - `max_col_len`: the Max length of the column, in bytes, is used for memory 
and network IO evaluation.
    
    After the job is divided, statistics tasks will be obtained.
---
 .../org/apache/doris/statistics/StatisticsJob.java | 121 ++++++++++-
 .../doris/statistics/StatisticsJobScheduler.java   | 232 +++++++++++++++++++--
 .../apache/doris/statistics/StatisticsTask.java    | 142 ++++++++++++-
 .../doris/statistics/StatisticsTaskScheduler.java  |   5 +-
 .../apache/doris/statistics/StatsCategoryDesc.java |  32 +++
 .../doris/statistics/StatsGranularityDesc.java     |  32 +++
 6 files changed, 536 insertions(+), 28 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
index 19b7a6a93f..bfe496dbdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
@@ -21,15 +21,17 @@ import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /***
  * Used to store statistics job info,
@@ -47,6 +49,8 @@ public class StatisticsJob {
         CANCELLED
     }
 
+    protected final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
     private final long id = Catalog.getCurrentCatalog().getNextId();
 
     /**
@@ -89,6 +93,22 @@ public class StatisticsJob {
         this.properties = properties == null ? Maps.newHashMap() : properties;
     }
 
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
     public long getId() {
         return this.id;
     }
@@ -137,6 +157,103 @@ public class StatisticsJob {
         return this.progress;
     }
 
+    public void updateJobState(JobState newState) throws IllegalStateException 
{
+        LOG.info("To change statistics job(id={}) state from {} to {}", id, 
jobState, newState);
+        writeLock();
+
+        try {
+            // PENDING -> SCHEDULING/FAILED/CANCELLED
+            if (jobState == JobState.PENDING) {
+                if (newState == JobState.SCHEDULING) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else {
+                    LOG.info("Invalid statistics job(id={}) state transition 
from {} to {}", id, jobState, newState);
+                    throw new IllegalStateException("Invalid job state 
transition from PENDING to " + newState);
+                }
+                return;
+            }
+
+            // SCHEDULING -> RUNNING/FAILED/CANCELLED
+            if (jobState == JobState.SCHEDULING) {
+                if (newState == JobState.RUNNING) {
+                    this.jobState = newState;
+                    // job start running, set start time
+                    this.startTime = System.currentTimeMillis();
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                }  else {
+                    LOG.info("Invalid statistics job(id={}) state transition 
from {} to {}", id, jobState, newState);
+                    throw new IllegalStateException("Invalid job state 
transition from SCHEDULING to " + newState);
+                }
+                return;
+            }
+
+            // RUNNING -> FINISHED/FAILED/CANCELLED
+            if (jobState == JobState.RUNNING) {
+                if (newState == JobState.FINISHED) {
+                    // set finish time
+                    this.finishTime = System.currentTimeMillis();
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) state changed from {} to 
{}", id, jobState, newState);
+                }  else {
+                    LOG.info("Invalid statistics job(id={}) state transition 
from {} to {}", id, jobState, newState);
+                    throw new IllegalStateException("Invalid job state 
transition from RUNNING to " + newState);
+                }
+                return;
+            }
+
+            // unsupported transition
+            LOG.info("Invalid job(id={}) state transition from {} to {}", id, 
jobState, newState);
+            throw new IllegalStateException("Invalid job state transition from 
" + jobState + " to " + newState);
+        } finally {
+            writeUnlock();
+            LOG.info("Statistics job(id={}) current state is {} ", id, 
jobState);
+        }
+    }
+
+    public void updateJobInfoByTaskId(Long taskId, String errorMsg) {
+        writeLock();
+
+        try {
+            for (StatisticsTask task : this.tasks) {
+                if (taskId == task.getId()) {
+                    if (Strings.isNullOrEmpty(errorMsg)) {
+                        this.progress += 1;
+                        if (this.progress == this.tasks.size()) {
+                            updateJobState(StatisticsJob.JobState.FINISHED);
+                        }
+                        
task.updateTaskState(StatisticsTask.TaskState.FINISHED);
+                    } else {
+                        this.errorMsgs.add(errorMsg);
+                        task.updateTaskState(StatisticsTask.TaskState.FAILED);
+                        updateJobState(StatisticsJob.JobState.FAILED);
+                    }
+                    return;
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     /**
      * get statisticsJob from analyzeStmt.
      * AnalyzeStmt: analyze t1(c1,c2,c3)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
index 60aefa93c9..b62fea4381 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
@@ -18,23 +18,52 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.MasterDaemon;
 
 import com.google.common.collect.Queues;
 
-import java.util.ArrayList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 
-/*
-Schedule statistics job.
-  1. divide job to multi task
-  2. submit all task to StatisticsTaskScheduler
-Switch job state from pending to scheduling.
+/**
+  * Schedule statistics job.
+  *     1. divide job to multi task
+  *     2. submit all task to StatisticsTaskScheduler
+  * Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans 
for a single BE,
+     * we'll divide subtasks by partition. relevant 
values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is 
for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = 
Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by 
users.
+     * if all statistics be collected at the same time, the cluster may be 
overburdened
+     * and normal query services may be affected. Therefore, we put the jobs 
into the queue
+     * and schedule them one by one, and finally divide each job to several 
subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = 
Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
@@ -42,22 +71,193 @@ public class StatisticsJobScheduler extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
         StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        if (pendingJob != null) {
+            try {
+                if (pendingJob.getTasks().size() == 0) {
+                    divide(pendingJob);
+                }
+                List<StatisticsTask> tasks = pendingJob.getTasks();
+                
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
+                pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
+                pendingJobQueue.remove();
+            } catch (IllegalStateException e) {
+                // throw IllegalStateException if the queue is full, re-add 
the tasks next time
+                LOG.info("The statistics task queue is full, schedule the 
job(id={}) later", pendingJob.getId());
+            } catch (DdlException e) {
+                pendingJobQueue.remove();
+                pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
+                LOG.info("Failed to schedule the statistical job(id={})", 
pendingJob.getId(), e);
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws 
IllegalStateException {
         pendingJobQueue.add(statisticsJob);
     }
 
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and 
memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor 
and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an 
equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for 
memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for 
memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a 
sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by 
a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans 
for a single BE,
+     * we'll divide subtasks by partition. relevant 
values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private void divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tblIds = statisticsJob.getTblIds();
+        Map<Long, List<String>> tableIdToColumnName = 
statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = 
Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tblId : tblIds) {
+            Table tbl = db.getTableOrDdlException(tblId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tblId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId, 
tblId);
+            StatsGranularityDesc dataSizeGranularity = 
getTblStatsGranularityDesc(tblId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, 
Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = 
getTblStatsCategoryDesc(dbId, tblId);
+                StatsGranularityDesc rowCountGranularity = 
getTblStatsGranularityDesc(tblId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, 
Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+                    // divide subtasks by partition
+                    for (Long partitionId : partitionIds) {
+                        StatsCategoryDesc rowCountCategory = 
getTblStatsCategoryDesc(dbId, tblId);
+                        StatsGranularityDesc rowCountGranularity = 
getPartitionStatsGranularityDesc(tblId, partitionId);
+                        SQLStatisticsTask sqlTask = new 
SQLStatisticsTask(jobId,
+                                rowCountGranularity, rowCountCategory, 
Collections.singletonList(StatsType.ROW_COUNT));
+                        tasks.add(sqlTask);
+                    }
+                } else {
+                    StatsCategoryDesc rowCountCategory = 
getTblStatsCategoryDesc(dbId, tblId);
+                    StatsGranularityDesc rowCountGranularity = 
getTblStatsGranularityDesc(tblId);
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                            rowCountGranularity, rowCountCategory, 
Collections.singletonList(StatsType.ROW_COUNT));
+                    tasks.add(sqlTask);
+                }
+            }
+
+            // step 3: generate [min,max,ndv] task
+            if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+                // divide subtasks by partition
+                columnNameList.forEach(columnName -> {
+                    for (Long partitionId : partitionIds) {
+                        StatsCategoryDesc columnCategory = 
getColStatsCategoryDesc(dbId, tblId, columnName);
+                        StatsGranularityDesc columnGranularity = 
getPartitionStatsGranularityDesc(tblId, partitionId);
+                        List<StatsType> statsTypes = 
Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
+                        SQLStatisticsTask sqlTask = new 
SQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
+                        tasks.add(sqlTask);
+                    }
+                });
+            } else {
+                for (String columnName : columnNameList) {
+                    StatsCategoryDesc columnCategory = 
getColStatsCategoryDesc(dbId, tblId, columnName);
+                    StatsGranularityDesc columnGranularity = 
getTblStatsGranularityDesc(tblId);
+                    List<StatsType> statsTypes = 
Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId, 
columnGranularity, columnCategory, statsTypes);
+                    tasks.add(sqlTask);
+                }
+            }
+
+            // step 4: generate num_nulls task
+            for (String columnName : columnNameList) {
+                StatsCategoryDesc columnCategory = 
getColStatsCategoryDesc(dbId, tblId, columnName);
+                StatsGranularityDesc columnGranularity = 
getTblStatsGranularityDesc(tblId);
+                SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                        columnGranularity, columnCategory, 
Collections.singletonList(StatsType.NUM_NULLS));
+                tasks.add(sqlTask);
+            }
+
+            // step 5: generate [max_col_lens, avg_col_lens] task
+            for (String columnName : columnNameList) {
+                StatsCategoryDesc columnCategory = 
getColStatsCategoryDesc(dbId, tblId, columnName);
+                StatsGranularityDesc columnGranularity = 
getTblStatsGranularityDesc(tblId);
+                List<StatsType> statsTypes = Arrays.asList(StatsType.MAX_SIZE, 
StatsType.AVG_SIZE);
+                Column column = tbl.getColumn(columnName);
+                Type colType = column.getType();
+                if (colType.isStringType()) {
+                    SQLStatisticsTask sampleSqlTask = new 
SampleSQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
+                    tasks.add(sampleSqlTask);
+                } else {
+                    MetaStatisticsTask metaTask = new 
MetaStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
+                    tasks.add(metaTask);
+                }
+            }
+        }
+    }
+
+    private StatsCategoryDesc getTblStatsCategoryDesc(long dbId, long tableId) 
{
+        StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
+        statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.TABLE);
+        statsCategoryDesc.setDbId(dbId);
+        statsCategoryDesc.setTableId(tableId);
+        return statsCategoryDesc;
+    }
 
-    private List<StatisticsTask> divide(StatisticsJob statisticsJob) {
-        // TODO
-        return new ArrayList<>();
+    private StatsCategoryDesc getColStatsCategoryDesc(long dbId, long tableId, 
String columnName) {
+        StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
+        statsCategoryDesc.setDbId(dbId);
+        statsCategoryDesc.setTableId(tableId);
+        statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.COLUMN);
+        statsCategoryDesc.setColumnName(columnName);
+        return statsCategoryDesc;
+    }
+
+    private StatsGranularityDesc getTblStatsGranularityDesc(long tableId) {
+        StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
+        statsGranularityDesc.setTableId(tableId);
+        
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.TABLE);
+        return statsGranularityDesc;
+    }
+
+    private StatsGranularityDesc getPartitionStatsGranularityDesc(long 
tableId, long partitionId) {
+        StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
+        statsGranularityDesc.setTableId(tableId);
+        statsGranularityDesc.setPartitionId(partitionId);
+        
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
+        return statsGranularityDesc;
+    }
+
+    private StatsGranularityDesc getTabletStatsGranularityDesc(long tableId) {
+        StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
+        statsGranularityDesc.setTableId(tableId);
+        
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
+        return statsGranularityDesc;
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
index 3692405bce..a98df1753d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
@@ -19,12 +19,17 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Catalog;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * The StatisticsTask belongs to one StatisticsJob.
  * A job may be split into multiple tasks but a task can only belong to one 
job.
+ *
  * @granularityDesc, @categoryDesc, @statsTypeList
  * These three attributes indicate which statistics this task is responsible 
for collecting.
  * In general, a task will collect more than one @StatsType at the same time
@@ -32,24 +37,147 @@ import java.util.concurrent.Callable;
  * For example: the task is responsible for collecting min, max, ndv of t1.c1 
in partition p1.
  * @granularityDesc: StatsGranularity=partition
  */
-public class StatisticsTask implements Callable<StatisticsTaskResult> {
-    protected long id = Catalog.getCurrentCatalog().getNextId();;
+public abstract class StatisticsTask implements Callable<StatisticsTaskResult> 
{
+    protected static final Logger LOG = 
LogManager.getLogger(StatisticsTask.class);
+
+    public enum TaskState {
+        PENDING,
+        RUNNING,
+        FINISHED,
+        FAILED
+    }
+
+    protected final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    protected long id = Catalog.getCurrentCatalog().getNextId();
     protected long jobId;
     protected StatsGranularityDesc granularityDesc;
     protected StatsCategoryDesc categoryDesc;
     protected List<StatsType> statsTypeList;
+    protected TaskState taskState = TaskState.PENDING;
 
-    public StatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                          StatsCategoryDesc categoryDesc, List<StatsType> 
statsTypeList) {
+    protected final long createTime = System.currentTimeMillis();
+    protected long startTime = -1L;
+    protected long finishTime = -1L;
+
+    public StatisticsTask(long jobId,
+                          StatsGranularityDesc granularityDesc,
+                          StatsCategoryDesc categoryDesc,
+                          List<StatsType> statsTypeList) {
         this.jobId = jobId;
         this.granularityDesc = granularityDesc;
         this.categoryDesc = categoryDesc;
         this.statsTypeList = statsTypeList;
     }
 
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    protected void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    protected void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    public long getId() {
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getJobId() {
+        return this.jobId;
+    }
+
+    public StatsGranularityDesc getGranularityDesc() {
+        return this.granularityDesc;
+    }
+
+    public StatsCategoryDesc getCategoryDesc() {
+        return this.categoryDesc;
+    }
+
+    public List<StatsType> getStatsTypeList() {
+        return this.statsTypeList;
+    }
+
+    public TaskState getTaskState() {
+        return this.taskState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    /**
+     * Different statistics implement different collection methods.
+     *
+     * @return true if this task is finished, false otherwise
+     * @throws Exception
+     */
     @Override
-    public StatisticsTaskResult call() throws Exception {
-        // TODO
-        return null;
+    public abstract StatisticsTaskResult call() throws Exception;
+
+    public void updateTaskState(TaskState newState) throws 
IllegalStateException{
+        LOG.info("To change statistics task(id={}) state from {} to {}", id, 
taskState, newState);
+        writeLock();
+
+        try {
+            // PENDING -> RUNNING/FAILED
+            if (taskState == TaskState.PENDING) {
+                if (newState == TaskState.RUNNING) {
+                    taskState = newState;
+                    // task start running, set start time
+                    startTime = System.currentTimeMillis();
+                    LOG.info("Statistics task(id={}) state changed from {} to 
{}", id, taskState, newState);
+                } else if (newState == TaskState.FAILED) {
+                    taskState = newState;
+                    LOG.info("Statistics task(id={}) state changed from {} to 
{}", id, taskState, newState);
+                } else {
+                    LOG.info("Invalid task(id={}) state transition from {} to 
{}", id, taskState, newState);
+                    throw new IllegalStateException("Invalid task state 
transition from PENDING to " + newState);
+                }
+                return;
+            }
+
+            // RUNNING -> FINISHED/FAILED
+            if (taskState == TaskState.RUNNING) {
+                if (newState == TaskState.FINISHED) {
+                    // set finish time
+                    finishTime = System.currentTimeMillis();
+                    taskState = newState;
+                    LOG.info("Statistics task(id={}) state changed from {} to 
{}", id, taskState, newState);
+                } else if (newState == TaskState.FAILED) {
+                    taskState = newState;
+                    LOG.info("Statistics task(id={}) state changed from {} to 
{}", id, taskState, newState);
+                } else {
+                    LOG.info("Invalid task(id={}) state transition from {} to 
{}", id, taskState, newState);
+                    throw new IllegalStateException("Invalid task state 
transition from RUNNING to " + newState);
+                }
+            }
+
+            LOG.info("Invalid task(id={}) state transition from {} to {}", id, 
taskState, newState);
+            throw new IllegalStateException("Invalid task state transition 
from " + taskState + " to " + newState);
+        } finally {
+            writeUnlock();
+            LOG.info("Statistics task(id={}) current state is {}", id, 
taskState);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
index 0c837f72fa..46450417b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
@@ -20,6 +20,7 @@ package org.apache.doris.statistics;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.MasterDaemon;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 
 import org.apache.logging.log4j.LogManager;
@@ -31,15 +32,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import com.clearspring.analytics.util.Lists;
-
 /*
 Schedule statistics task
  */
 public class StatisticsTaskScheduler extends MasterDaemon {
     private final static Logger LOG = 
LogManager.getLogger(StatisticsTaskScheduler.class);
 
-    private Queue<StatisticsTask> queue = Queues.newLinkedBlockingQueue();
+    private final Queue<StatisticsTask> queue = 
Queues.newLinkedBlockingQueue();
 
     public StatisticsTaskScheduler() {
         super("Statistics task scheduler", 0);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
index 5fed87e5dc..0691eaa970 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
@@ -24,8 +24,40 @@ public class StatsCategoryDesc {
     }
 
     private StatsCategory category;
+    private long dbId;
     private long tableId;
     private String columnName;
 
+    public StatsCategory getCategory() {
+        return this.category;
+    }
+
+    public void setCategory(StatsCategory category) {
+        this.category = category;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public void setDbId(long dbId) {
+        this.dbId = dbId;
+    }
 
+    public long getTableId() {
+        return this.tableId;
+    }
+
+    public void setTableId(long tableId) {
+        this.tableId = tableId;
+    }
+
+    public String getColumnName() {
+        return this.columnName;
+    }
+
+    public void setColumnName(String columnName) {
+        this.columnName = columnName;
+    }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
index 91e4a43213..54790d964e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
@@ -29,4 +29,36 @@ public class StatsGranularityDesc {
     private long partitionId;
     private long tabletId;
 
+    public StatsGranularity getGranularity() {
+        return this.granularity;
+    }
+
+    public void setGranularity(StatsGranularity granularity) {
+        this.granularity = granularity;
+    }
+
+    public long getTableId() {
+        return this.tableId;
+    }
+
+    public void setTableId(long tableId) {
+        this.tableId = tableId;
+    }
+
+    public long getPartitionId() {
+        return this.partitionId;
+    }
+
+    public void setPartitionId(long partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public long getTabletId() {
+        return this.tabletId;
+    }
+
+    public void setTabletId(long tabletId) {
+        this.tabletId = tabletId;
+    }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to