This is an automated email from the ASF dual-hosted git repository.
lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new dfbeeccd47 [feature-wip](statistics) step2: schedule the statistics
job and generate executable tasks (#8859)
dfbeeccd47 is described below
commit dfbeeccd4795b3d4fe26a60b288c9aee85187cf9
Author: ElvinWei <[email protected]>
AuthorDate: Wed Apr 27 11:05:43 2022 +0800
[feature-wip](statistics) step2: schedule the statistics job and generate
executable tasks (#8859)
This pull request includes some implementations of the
statistics(https://github.com/apache/incubator-doris/issues/6370), it will not
affect any existing code and users will not be able to create statistics job.
After receiving the statistics collection statement, it generates a job.
Here it implements the division of statistics collection jobs according to the
following statistics categories:
table:
- `row_count`: table row count are critical in estimating cardinality and
memory usage of scan nodes.
- `data_size`: table size, not applicable to CBO, mainly used to monitor
and manage table size.
column:
- `num_distinct_value`: used to determine the selectivity of an equivalent
expression.
- `min`: The minimum value.
- `max`: The maximum value.
- `num_nulls`: number of nulls.
- `avg_col_len`: the average length of a column, in bytes, is used for
memory and network IO evaluation.
- `max_col_len`: the Max length of the column, in bytes, is used for memory
and network IO evaluation.
After the job is divided, statistics tasks will be obtained.
---
.../org/apache/doris/statistics/StatisticsJob.java | 121 ++++++++++-
.../doris/statistics/StatisticsJobScheduler.java | 232 +++++++++++++++++++--
.../apache/doris/statistics/StatisticsTask.java | 142 ++++++++++++-
.../doris/statistics/StatisticsTaskScheduler.java | 5 +-
.../apache/doris/statistics/StatsCategoryDesc.java | 32 +++
.../doris/statistics/StatsGranularityDesc.java | 32 +++
6 files changed, 536 insertions(+), 28 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
index 19b7a6a93f..bfe496dbdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
@@ -21,15 +21,17 @@ import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/***
* Used to store statistics job info,
@@ -47,6 +49,8 @@ public class StatisticsJob {
CANCELLED
}
+ protected final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
+
private final long id = Catalog.getCurrentCatalog().getNextId();
/**
@@ -89,6 +93,22 @@ public class StatisticsJob {
this.properties = properties == null ? Maps.newHashMap() : properties;
}
+ public void readLock() {
+ lock.readLock().lock();
+ }
+
+ public void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
public long getId() {
return this.id;
}
@@ -137,6 +157,103 @@ public class StatisticsJob {
return this.progress;
}
+ public void updateJobState(JobState newState) throws IllegalStateException
{
+ LOG.info("To change statistics job(id={}) state from {} to {}", id,
jobState, newState);
+ writeLock();
+
+ try {
+ // PENDING -> SCHEDULING/FAILED/CANCELLED
+ if (jobState == JobState.PENDING) {
+ if (newState == JobState.SCHEDULING) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else if (newState == JobState.FAILED) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else if (newState == JobState.CANCELLED) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else {
+ LOG.info("Invalid statistics job(id={}) state transition
from {} to {}", id, jobState, newState);
+ throw new IllegalStateException("Invalid job state
transition from PENDING to " + newState);
+ }
+ return;
+ }
+
+ // SCHEDULING -> RUNNING/FAILED/CANCELLED
+ if (jobState == JobState.SCHEDULING) {
+ if (newState == JobState.RUNNING) {
+ this.jobState = newState;
+ // job start running, set start time
+ this.startTime = System.currentTimeMillis();
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else if (newState == JobState.FAILED) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else if (newState == JobState.CANCELLED) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else {
+ LOG.info("Invalid statistics job(id={}) state transition
from {} to {}", id, jobState, newState);
+ throw new IllegalStateException("Invalid job state
transition from SCHEDULING to " + newState);
+ }
+ return;
+ }
+
+ // RUNNING -> FINISHED/FAILED/CANCELLED
+ if (jobState == JobState.RUNNING) {
+ if (newState == JobState.FINISHED) {
+ // set finish time
+ this.finishTime = System.currentTimeMillis();
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else if (newState == JobState.FAILED) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else if (newState == JobState.CANCELLED) {
+ this.jobState = newState;
+ LOG.info("Statistics job(id={}) state changed from {} to
{}", id, jobState, newState);
+ } else {
+ LOG.info("Invalid statistics job(id={}) state transition
from {} to {}", id, jobState, newState);
+ throw new IllegalStateException("Invalid job state
transition from RUNNING to " + newState);
+ }
+ return;
+ }
+
+ // unsupported transition
+ LOG.info("Invalid job(id={}) state transition from {} to {}", id,
jobState, newState);
+ throw new IllegalStateException("Invalid job state transition from
" + jobState + " to " + newState);
+ } finally {
+ writeUnlock();
+ LOG.info("Statistics job(id={}) current state is {} ", id,
jobState);
+ }
+ }
+
+ public void updateJobInfoByTaskId(Long taskId, String errorMsg) {
+ writeLock();
+
+ try {
+ for (StatisticsTask task : this.tasks) {
+ if (taskId == task.getId()) {
+ if (Strings.isNullOrEmpty(errorMsg)) {
+ this.progress += 1;
+ if (this.progress == this.tasks.size()) {
+ updateJobState(StatisticsJob.JobState.FINISHED);
+ }
+
task.updateTaskState(StatisticsTask.TaskState.FINISHED);
+ } else {
+ this.errorMsgs.add(errorMsg);
+ task.updateTaskState(StatisticsTask.TaskState.FAILED);
+ updateJobState(StatisticsJob.JobState.FAILED);
+ }
+ return;
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* get statisticsJob from analyzeStmt.
* AnalyzeStmt: analyze t1(c1,c2,c3)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
index 60aefa93c9..b62fea4381 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
@@ -18,23 +18,52 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.MasterDaemon;
import com.google.common.collect.Queues;
-import java.util.ArrayList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import java.util.Set;
-/*
-Schedule statistics job.
- 1. divide job to multi task
- 2. submit all task to StatisticsTaskScheduler
-Switch job state from pending to scheduling.
+/**
+ * Schedule statistics job.
+ * 1. divide job to multi task
+ * 2. submit all task to StatisticsTaskScheduler
+ * Switch job state from pending to scheduling.
*/
public class StatisticsJobScheduler extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJobScheduler.class);
+
+ /**
+ * If the table row-count is greater than the maximum number of Be scans
for a single BE,
+ * we'll divide subtasks by partition. relevant
values(3700000000L&600000000L) are derived from test.
+ * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is
for min(c1)/max(c1)/ndv(c1).
+ */
+ private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+ private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
- public Queue<StatisticsJob> pendingJobQueue =
Queues.newLinkedBlockingQueue();
+ /**
+ * Different statistics need to be collected for the jobs submitted by
users.
+ * if all statistics be collected at the same time, the cluster may be
overburdened
+ * and normal query services may be affected. Therefore, we put the jobs
into the queue
+ * and schedule them one by one, and finally divide each job to several
subtasks and execute them.
+ */
+ public final Queue<StatisticsJob> pendingJobQueue =
Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
public StatisticsJobScheduler() {
super("Statistics job scheduler", 0);
@@ -42,22 +71,193 @@ public class StatisticsJobScheduler extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- // TODO
StatisticsJob pendingJob = pendingJobQueue.peek();
- // step0: check job state again
- // step1: divide statistics job to task
- List<StatisticsTask> statisticsTaskList = divide(pendingJob);
- // step2: submit
-
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+ if (pendingJob != null) {
+ try {
+ if (pendingJob.getTasks().size() == 0) {
+ divide(pendingJob);
+ }
+ List<StatisticsTask> tasks = pendingJob.getTasks();
+
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
+ pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
+ pendingJobQueue.remove();
+ } catch (IllegalStateException e) {
+ // throw IllegalStateException if the queue is full, re-add
the tasks next time
+ LOG.info("The statistics task queue is full, schedule the
job(id={}) later", pendingJob.getId());
+ } catch (DdlException e) {
+ pendingJobQueue.remove();
+ pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
+ LOG.info("Failed to schedule the statistical job(id={})",
pendingJob.getId(), e);
+ }
+ }
}
public void addPendingJob(StatisticsJob statisticsJob) throws
IllegalStateException {
pendingJobQueue.add(statisticsJob);
}
+ /**
+ * Statistics tasks are of the following types:
+ * table:
+ * - row_count: table row count are critical in estimating cardinality and
memory usage of scan nodes.
+ * - data_size: table size, not applicable to CBO, mainly used to monitor
and manage table size.
+ * column:
+ * - num_distinct_value: used to determine the selectivity of an
equivalent expression.
+ * - min: The minimum value.
+ * - max: The maximum value.
+ * - num_nulls: number of nulls.
+ * - avg_col_len: the average length of a column, in bytes, is used for
memory and network IO evaluation.
+ * - max_col_len: the Max length of the column, in bytes, is used for
memory and network IO evaluation.
+ * <p>
+ * Divide:
+ * - min, max, ndv: These three full indicators are collected by a
sub-task.
+ * - max_col_lens, avg_col_lens: Two sampling indicators were collected by
a sub-task.
+ * <p>
+ * If the table row-count is greater than the maximum number of Be scans
for a single BE,
+ * we'll divide subtasks by partition. relevant
values(3700000000L&600000000L) are derived from test.
+ * <p>
+ * Eventually, we will get several subtasks of the following types:
+ *
+ * @throws DdlException DdlException
+ * @see MetaStatisticsTask
+ * @see SampleSQLStatisticsTask
+ * @see SQLStatisticsTask
+ */
+ private void divide(StatisticsJob statisticsJob) throws DdlException {
+ long jobId = statisticsJob.getId();
+ long dbId = statisticsJob.getDbId();
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+ Set<Long> tblIds = statisticsJob.getTblIds();
+ Map<Long, List<String>> tableIdToColumnName =
statisticsJob.getTableIdToColumnName();
+ List<StatisticsTask> tasks = statisticsJob.getTasks();
+ List<Long> backendIds =
Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+ for (Long tblId : tblIds) {
+ Table tbl = db.getTableOrDdlException(tblId);
+ long rowCount = tbl.getRowCount();
+ List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+ List<String> columnNameList = tableIdToColumnName.get(tblId);
+
+ // step 1: generate data_size task
+ StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId,
tblId);
+ StatsGranularityDesc dataSizeGranularity =
getTblStatsGranularityDesc(tblId);
+ MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+ dataSizeGranularity, dataSizeCategory,
Collections.singletonList(StatsType.DATA_SIZE));
+ tasks.add(dataSizeTask);
+
+ // step 2: generate row_count task
+ KeysType keysType = ((OlapTable) tbl).getKeysType();
+ if (keysType == KeysType.DUP_KEYS) {
+ StatsCategoryDesc rowCountCategory =
getTblStatsCategoryDesc(dbId, tblId);
+ StatsGranularityDesc rowCountGranularity =
getTblStatsGranularityDesc(tblId);
+ MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+ rowCountGranularity, rowCountCategory,
Collections.singletonList(StatsType.ROW_COUNT));
+ tasks.add(metaTask);
+ } else {
+ if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+ // divide subtasks by partition
+ for (Long partitionId : partitionIds) {
+ StatsCategoryDesc rowCountCategory =
getTblStatsCategoryDesc(dbId, tblId);
+ StatsGranularityDesc rowCountGranularity =
getPartitionStatsGranularityDesc(tblId, partitionId);
+ SQLStatisticsTask sqlTask = new
SQLStatisticsTask(jobId,
+ rowCountGranularity, rowCountCategory,
Collections.singletonList(StatsType.ROW_COUNT));
+ tasks.add(sqlTask);
+ }
+ } else {
+ StatsCategoryDesc rowCountCategory =
getTblStatsCategoryDesc(dbId, tblId);
+ StatsGranularityDesc rowCountGranularity =
getTblStatsGranularityDesc(tblId);
+ SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+ rowCountGranularity, rowCountCategory,
Collections.singletonList(StatsType.ROW_COUNT));
+ tasks.add(sqlTask);
+ }
+ }
+
+ // step 3: generate [min,max,ndv] task
+ if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+ // divide subtasks by partition
+ columnNameList.forEach(columnName -> {
+ for (Long partitionId : partitionIds) {
+ StatsCategoryDesc columnCategory =
getColStatsCategoryDesc(dbId, tblId, columnName);
+ StatsGranularityDesc columnGranularity =
getPartitionStatsGranularityDesc(tblId, partitionId);
+ List<StatsType> statsTypes =
Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
+ SQLStatisticsTask sqlTask = new
SQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
+ tasks.add(sqlTask);
+ }
+ });
+ } else {
+ for (String columnName : columnNameList) {
+ StatsCategoryDesc columnCategory =
getColStatsCategoryDesc(dbId, tblId, columnName);
+ StatsGranularityDesc columnGranularity =
getTblStatsGranularityDesc(tblId);
+ List<StatsType> statsTypes =
Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
+ SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
columnGranularity, columnCategory, statsTypes);
+ tasks.add(sqlTask);
+ }
+ }
+
+ // step 4: generate num_nulls task
+ for (String columnName : columnNameList) {
+ StatsCategoryDesc columnCategory =
getColStatsCategoryDesc(dbId, tblId, columnName);
+ StatsGranularityDesc columnGranularity =
getTblStatsGranularityDesc(tblId);
+ SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+ columnGranularity, columnCategory,
Collections.singletonList(StatsType.NUM_NULLS));
+ tasks.add(sqlTask);
+ }
+
+ // step 5: generate [max_col_lens, avg_col_lens] task
+ for (String columnName : columnNameList) {
+ StatsCategoryDesc columnCategory =
getColStatsCategoryDesc(dbId, tblId, columnName);
+ StatsGranularityDesc columnGranularity =
getTblStatsGranularityDesc(tblId);
+ List<StatsType> statsTypes = Arrays.asList(StatsType.MAX_SIZE,
StatsType.AVG_SIZE);
+ Column column = tbl.getColumn(columnName);
+ Type colType = column.getType();
+ if (colType.isStringType()) {
+ SQLStatisticsTask sampleSqlTask = new
SampleSQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
+ tasks.add(sampleSqlTask);
+ } else {
+ MetaStatisticsTask metaTask = new
MetaStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
+ tasks.add(metaTask);
+ }
+ }
+ }
+ }
+
+ private StatsCategoryDesc getTblStatsCategoryDesc(long dbId, long tableId)
{
+ StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
+ statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.TABLE);
+ statsCategoryDesc.setDbId(dbId);
+ statsCategoryDesc.setTableId(tableId);
+ return statsCategoryDesc;
+ }
- private List<StatisticsTask> divide(StatisticsJob statisticsJob) {
- // TODO
- return new ArrayList<>();
+ private StatsCategoryDesc getColStatsCategoryDesc(long dbId, long tableId,
String columnName) {
+ StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
+ statsCategoryDesc.setDbId(dbId);
+ statsCategoryDesc.setTableId(tableId);
+ statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.COLUMN);
+ statsCategoryDesc.setColumnName(columnName);
+ return statsCategoryDesc;
+ }
+
+ private StatsGranularityDesc getTblStatsGranularityDesc(long tableId) {
+ StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
+ statsGranularityDesc.setTableId(tableId);
+
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.TABLE);
+ return statsGranularityDesc;
+ }
+
+ private StatsGranularityDesc getPartitionStatsGranularityDesc(long
tableId, long partitionId) {
+ StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
+ statsGranularityDesc.setTableId(tableId);
+ statsGranularityDesc.setPartitionId(partitionId);
+
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
+ return statsGranularityDesc;
+ }
+
+ private StatsGranularityDesc getTabletStatsGranularityDesc(long tableId) {
+ StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
+ statsGranularityDesc.setTableId(tableId);
+
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
+ return statsGranularityDesc;
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
index 3692405bce..a98df1753d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
@@ -19,12 +19,17 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Catalog;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The StatisticsTask belongs to one StatisticsJob.
* A job may be split into multiple tasks but a task can only belong to one
job.
+ *
* @granularityDesc, @categoryDesc, @statsTypeList
* These three attributes indicate which statistics this task is responsible
for collecting.
* In general, a task will collect more than one @StatsType at the same time
@@ -32,24 +37,147 @@ import java.util.concurrent.Callable;
* For example: the task is responsible for collecting min, max, ndv of t1.c1
in partition p1.
* @granularityDesc: StatsGranularity=partition
*/
-public class StatisticsTask implements Callable<StatisticsTaskResult> {
- protected long id = Catalog.getCurrentCatalog().getNextId();;
+public abstract class StatisticsTask implements Callable<StatisticsTaskResult>
{
+ protected static final Logger LOG =
LogManager.getLogger(StatisticsTask.class);
+
+ public enum TaskState {
+ PENDING,
+ RUNNING,
+ FINISHED,
+ FAILED
+ }
+
+ protected final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
+
+ protected long id = Catalog.getCurrentCatalog().getNextId();
protected long jobId;
protected StatsGranularityDesc granularityDesc;
protected StatsCategoryDesc categoryDesc;
protected List<StatsType> statsTypeList;
+ protected TaskState taskState = TaskState.PENDING;
- public StatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
- StatsCategoryDesc categoryDesc, List<StatsType>
statsTypeList) {
+ protected final long createTime = System.currentTimeMillis();
+ protected long startTime = -1L;
+ protected long finishTime = -1L;
+
+ public StatisticsTask(long jobId,
+ StatsGranularityDesc granularityDesc,
+ StatsCategoryDesc categoryDesc,
+ List<StatsType> statsTypeList) {
this.jobId = jobId;
this.granularityDesc = granularityDesc;
this.categoryDesc = categoryDesc;
this.statsTypeList = statsTypeList;
}
+ public void readLock() {
+ lock.readLock().lock();
+ }
+
+ public void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ protected void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ protected void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ public long getId() {
+ return this.id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public long getJobId() {
+ return this.jobId;
+ }
+
+ public StatsGranularityDesc getGranularityDesc() {
+ return this.granularityDesc;
+ }
+
+ public StatsCategoryDesc getCategoryDesc() {
+ return this.categoryDesc;
+ }
+
+ public List<StatsType> getStatsTypeList() {
+ return this.statsTypeList;
+ }
+
+ public TaskState getTaskState() {
+ return this.taskState;
+ }
+
+ public long getCreateTime() {
+ return this.createTime;
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ /**
+ * Different statistics implement different collection methods.
+ *
+ * @return true if this task is finished, false otherwise
+ * @throws Exception
+ */
@Override
- public StatisticsTaskResult call() throws Exception {
- // TODO
- return null;
+ public abstract StatisticsTaskResult call() throws Exception;
+
+ public void updateTaskState(TaskState newState) throws
IllegalStateException{
+ LOG.info("To change statistics task(id={}) state from {} to {}", id,
taskState, newState);
+ writeLock();
+
+ try {
+ // PENDING -> RUNNING/FAILED
+ if (taskState == TaskState.PENDING) {
+ if (newState == TaskState.RUNNING) {
+ taskState = newState;
+ // task start running, set start time
+ startTime = System.currentTimeMillis();
+ LOG.info("Statistics task(id={}) state changed from {} to
{}", id, taskState, newState);
+ } else if (newState == TaskState.FAILED) {
+ taskState = newState;
+ LOG.info("Statistics task(id={}) state changed from {} to
{}", id, taskState, newState);
+ } else {
+ LOG.info("Invalid task(id={}) state transition from {} to
{}", id, taskState, newState);
+ throw new IllegalStateException("Invalid task state
transition from PENDING to " + newState);
+ }
+ return;
+ }
+
+ // RUNNING -> FINISHED/FAILED
+ if (taskState == TaskState.RUNNING) {
+ if (newState == TaskState.FINISHED) {
+ // set finish time
+ finishTime = System.currentTimeMillis();
+ taskState = newState;
+ LOG.info("Statistics task(id={}) state changed from {} to
{}", id, taskState, newState);
+ } else if (newState == TaskState.FAILED) {
+ taskState = newState;
+ LOG.info("Statistics task(id={}) state changed from {} to
{}", id, taskState, newState);
+ } else {
+ LOG.info("Invalid task(id={}) state transition from {} to
{}", id, taskState, newState);
+ throw new IllegalStateException("Invalid task state
transition from RUNNING to " + newState);
+ }
+ }
+
+ LOG.info("Invalid task(id={}) state transition from {} to {}", id,
taskState, newState);
+ throw new IllegalStateException("Invalid task state transition
from " + taskState + " to " + newState);
+ } finally {
+ writeUnlock();
+ LOG.info("Statistics task(id={}) current state is {}", id,
taskState);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
index 0c837f72fa..46450417b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
@@ -20,6 +20,7 @@ package org.apache.doris.statistics;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
@@ -31,15 +32,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import com.clearspring.analytics.util.Lists;
-
/*
Schedule statistics task
*/
public class StatisticsTaskScheduler extends MasterDaemon {
private final static Logger LOG =
LogManager.getLogger(StatisticsTaskScheduler.class);
- private Queue<StatisticsTask> queue = Queues.newLinkedBlockingQueue();
+ private final Queue<StatisticsTask> queue =
Queues.newLinkedBlockingQueue();
public StatisticsTaskScheduler() {
super("Statistics task scheduler", 0);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
index 5fed87e5dc..0691eaa970 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
@@ -24,8 +24,40 @@ public class StatsCategoryDesc {
}
private StatsCategory category;
+ private long dbId;
private long tableId;
private String columnName;
+ public StatsCategory getCategory() {
+ return this.category;
+ }
+
+ public void setCategory(StatsCategory category) {
+ this.category = category;
+ }
+
+ public long getDbId() {
+ return this.dbId;
+ }
+
+ public void setDbId(long dbId) {
+ this.dbId = dbId;
+ }
+ public long getTableId() {
+ return this.tableId;
+ }
+
+ public void setTableId(long tableId) {
+ this.tableId = tableId;
+ }
+
+ public String getColumnName() {
+ return this.columnName;
+ }
+
+ public void setColumnName(String columnName) {
+ this.columnName = columnName;
+ }
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
index 91e4a43213..54790d964e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
@@ -29,4 +29,36 @@ public class StatsGranularityDesc {
private long partitionId;
private long tabletId;
+ public StatsGranularity getGranularity() {
+ return this.granularity;
+ }
+
+ public void setGranularity(StatsGranularity granularity) {
+ this.granularity = granularity;
+ }
+
+ public long getTableId() {
+ return this.tableId;
+ }
+
+ public void setTableId(long tableId) {
+ this.tableId = tableId;
+ }
+
+ public long getPartitionId() {
+ return this.partitionId;
+ }
+
+ public void setPartitionId(long partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public long getTabletId() {
+ return this.tabletId;
+ }
+
+ public void setTabletId(long tabletId) {
+ this.tabletId = tabletId;
+ }
}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]