EmmyMiao87 commented on a change in pull request #8348:
URL: https://github.com/apache/incubator-doris/pull/8348#discussion_r838221254
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
##########
@@ -18,46 +18,243 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.StatisticsJob.JobState;
import com.google.common.collect.Queues;
-import java.util.ArrayList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import java.util.Set;
-/*
-Schedule statistics job.
- 1. divide job to multi task
- 2. submit all task to StatisticsTaskScheduler
-Switch job state from pending to scheduling.
+/**
+ * Schedule statistics job.
+ * 1. divide job to multi task
+ * 2. submit all task to StatisticsTaskScheduler
+ * Switch job state from pending to scheduling.
*/
public class StatisticsJobScheduler extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJobScheduler.class);
- public Queue<StatisticsJob> pendingJobQueue =
Queues.newLinkedBlockingQueue();
+ /**
+ * Different statistics need to be collected for the jobs submitted by
users.
+ * if all statistics be collected at the same time, the cluster may be
overburdened
+ * and normal query services may be affected. Therefore, we put the jobs
into the queue
+ * and schedule them one by one, and finally divide each job to several
subtasks and execute them.
+ */
+ public final Queue<StatisticsJob> pendingJobQueue;
public StatisticsJobScheduler() {
super("Statistics job scheduler", 0);
+ this.pendingJobQueue =
Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
}
@Override
protected void runAfterCatalogReady() {
- // TODO
- StatisticsJob pendingJob = pendingJobQueue.peek();
- // step0: check job state again
- // step1: divide statistics job to task
- List<StatisticsTask> statisticsTaskList = divide(pendingJob);
- // step2: submit
-
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+ StatisticsJob pendingJob = this.pendingJobQueue.peek();
+ if (pendingJob != null) {
+ // step0: check job state again
+ JobState jobState = pendingJob.getJobState();
+ if (jobState == JobState.PENDING) {
+ try {
+ // step1: divide statistics job to tasks
+ List<StatisticsTask> tasks = this.divide(pendingJob);
+ // step2: submit tasks
+
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
Review comment:
It seams that the StatisticsJobs will not be removed after it has been
scheduled.
Then, when StatisticsJobs will be removed from the queue ? It will be hang
on this scheduler
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
##########
@@ -18,56 +18,128 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.statistics.StatisticsJob.JobState;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
-/*
-For unified management of statistics job,
-including job addition, cancellation, scheduling, etc.
+/**
+ * For unified management of statistics job,
+ * including job addition, cancellation, scheduling, etc.
*/
public class StatisticsJobManager {
private static final Logger LOG =
LogManager.getLogger(StatisticsJobManager.class);
- // statistics job
- private Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
+ /**
+ * save statistics job status information
+ */
+ private final Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
- public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
+ public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+ return idToStatisticsJob;
+ }
+
+ public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws
DdlException {
// step0: init statistics job by analyzeStmt
StatisticsJob statisticsJob =
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
- // step1: get statistics to be analyzed
- Set<Long> tableIdList = statisticsJob.relatedTableId();
+
+ // step1: get statistical db&tbl to be analyzed
+ long dbId = statisticsJob.getDbId();
+ Set<Long> tableIds = statisticsJob.relatedTableId();
+
// step2: check restrict
- checkRestrict(tableIdList);
+ this.checkRestrict(dbId, tableIds);
+
// step3: check permission
- checkPermission();
+ UserIdentity userInfo = analyzeStmt.getUserInfo();
+ this.checkPermission(dbId, tableIds, userInfo);
+
// step4: create it
- createStatisticsJob(statisticsJob);
+ this.createStatisticsJob(statisticsJob);
}
public void createStatisticsJob(StatisticsJob statisticsJob) {
idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
try {
Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
} catch (IllegalStateException e) {
- LOG.info("The pending statistics job is full. Please submit it
again later.");
+ LOG.warn("The pending statistics job is full. Please submit it
again later.");
}
}
- // Rule1: The same table cannot have two unfinished statistics jobs
- // Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
- // Rule3: The job for external table is not supported
- private void checkRestrict(Set<Long> tableIdList) {
- // TODO
+ /**
+ * Rule1: The same table cannot have two unfinished statistics jobs
+ * Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
+ * Rule3: The job for external table is not supported
+ */
+ private void checkRestrict(long dbId, Set<Long> tableIds) throws
DdlException {
Review comment:
Is it the thread save in Rule2 and Rule3 ?
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -18,20 +18,27 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
+import com.clearspring.analytics.util.Lists;
Review comment:
import order ~
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -40,40 +47,166 @@
SCHEDULING,
RUNNING,
FINISHED,
- CANCELLED
+ CANCELLED,
+ FAILED
}
- private long id = -1;
+ private final long id;
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final List<Long> tableIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * to be executed tasks.
+ */
+ private final List<StatisticsTask> tasks;
+
+ /**
+ * The progress of the job, it's equal to the number of completed tasks.
+ */
+ private int progress = 0;
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
Review comment:
properties missing ~
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -40,40 +47,166 @@
SCHEDULING,
RUNNING,
FINISHED,
- CANCELLED
+ CANCELLED,
+ FAILED
}
- private long id = -1;
+ private final long id;
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final List<Long> tableIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * to be executed tasks.
+ */
+ private final List<StatisticsTask> tasks;
+
+ /**
+ * The progress of the job, it's equal to the number of completed tasks.
+ */
+ private int progress = 0;
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
- // end
- private List<StatisticsTask> taskList = Lists.newArrayList();
+ public StatisticsJob(Long dbId, List<Long> tableIdList, Map<Long,
List<String>> tableIdToColumnName) {
+ this.id = Catalog.getCurrentCatalog().getNextId();
+ this.dbId = dbId;
+ this.tableIds = tableIdList;
+ this.tableIdToColumnName = tableIdToColumnName;
+ this.tasks = Lists.newArrayList();
+ }
public long getId() {
return id;
}
- /*
- AnalyzeStmt: Analyze t1(c1), t2
- StatisticsJob:
- tableId [t1, t2]
- tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
- */
- public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
- // TODO
- return new StatisticsJob();
+ public long getDbId() {
+ return dbId;
+ }
+
+ public List<Long> getTableIds() {
+ return tableIds;
+ }
+
+ public Map<Long, List<String>> getTableIdToColumnName() {
+ return tableIdToColumnName;
+ }
+
+ public List<StatisticsTask> getTasks() {
+ return tasks;
+ }
+
+ public int getProgress() {
+ return progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ public JobState getJobState() {
+ return jobState;
+ }
+
+ public void setJobState(JobState jobState) {
+ this.jobState = jobState;
+ }
+
+ /**
+ * get statisticsJob from analyzeStmt.
+ * AnalyzeStmt: analyze t1(c1,c2,c3)
+ * tableId: [t1]
+ * tableIdToColumnName <t1, [c1,c2,c3]>
+ */
+ public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt)
throws DdlException {
Review comment:
The dbTableName is a optional property, so you should consider it in
this function ~
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
##########
@@ -18,56 +18,128 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.statistics.StatisticsJob.JobState;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
-/*
-For unified management of statistics job,
-including job addition, cancellation, scheduling, etc.
+/**
+ * For unified management of statistics job,
+ * including job addition, cancellation, scheduling, etc.
*/
public class StatisticsJobManager {
private static final Logger LOG =
LogManager.getLogger(StatisticsJobManager.class);
- // statistics job
- private Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
+ /**
+ * save statistics job status information
+ */
+ private final Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
- public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
+ public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+ return idToStatisticsJob;
+ }
+
+ public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws
DdlException {
// step0: init statistics job by analyzeStmt
StatisticsJob statisticsJob =
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
- // step1: get statistics to be analyzed
- Set<Long> tableIdList = statisticsJob.relatedTableId();
+
+ // step1: get statistical db&tbl to be analyzed
+ long dbId = statisticsJob.getDbId();
+ Set<Long> tableIds = statisticsJob.relatedTableId();
+
// step2: check restrict
- checkRestrict(tableIdList);
+ this.checkRestrict(dbId, tableIds);
+
// step3: check permission
- checkPermission();
+ UserIdentity userInfo = analyzeStmt.getUserInfo();
+ this.checkPermission(dbId, tableIds, userInfo);
+
// step4: create it
- createStatisticsJob(statisticsJob);
+ this.createStatisticsJob(statisticsJob);
}
public void createStatisticsJob(StatisticsJob statisticsJob) {
idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
try {
Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
} catch (IllegalStateException e) {
- LOG.info("The pending statistics job is full. Please submit it
again later.");
+ LOG.warn("The pending statistics job is full. Please submit it
again later.");
Review comment:
info is better. warn means that system appears warning.
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
##########
@@ -18,56 +18,128 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.statistics.StatisticsJob.JobState;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
-/*
-For unified management of statistics job,
-including job addition, cancellation, scheduling, etc.
+/**
+ * For unified management of statistics job,
+ * including job addition, cancellation, scheduling, etc.
*/
public class StatisticsJobManager {
private static final Logger LOG =
LogManager.getLogger(StatisticsJobManager.class);
- // statistics job
- private Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
+ /**
+ * save statistics job status information
+ */
+ private final Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
- public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
+ public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+ return idToStatisticsJob;
+ }
+
+ public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws
DdlException {
// step0: init statistics job by analyzeStmt
StatisticsJob statisticsJob =
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
- // step1: get statistics to be analyzed
- Set<Long> tableIdList = statisticsJob.relatedTableId();
+
+ // step1: get statistical db&tbl to be analyzed
+ long dbId = statisticsJob.getDbId();
+ Set<Long> tableIds = statisticsJob.relatedTableId();
+
// step2: check restrict
- checkRestrict(tableIdList);
+ this.checkRestrict(dbId, tableIds);
+
// step3: check permission
- checkPermission();
+ UserIdentity userInfo = analyzeStmt.getUserInfo();
+ this.checkPermission(dbId, tableIds, userInfo);
+
// step4: create it
- createStatisticsJob(statisticsJob);
+ this.createStatisticsJob(statisticsJob);
}
public void createStatisticsJob(StatisticsJob statisticsJob) {
idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
try {
Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
} catch (IllegalStateException e) {
- LOG.info("The pending statistics job is full. Please submit it
again later.");
+ LOG.warn("The pending statistics job is full. Please submit it
again later.");
}
}
- // Rule1: The same table cannot have two unfinished statistics jobs
- // Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
- // Rule3: The job for external table is not supported
- private void checkRestrict(Set<Long> tableIdList) {
- // TODO
+ /**
+ * Rule1: The same table cannot have two unfinished statistics jobs
+ * Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
+ * Rule3: The job for external table is not supported
+ */
+ private void checkRestrict(long dbId, Set<Long> tableIds) throws
DdlException {
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+
+ // check table type
+ for (Long tableId : tableIds) {
+ Table table = db.getTableOrDdlException(tableId);
+ if (table.getType() != Table.TableType.OLAP) {
+ throw new DdlException("The external table(" + table.getName()
+ ") is not supported.");
+ }
+ }
+
+ int unfinishedJobs = 0;
+ StatisticsJobScheduler jobScheduler =
Catalog.getCurrentCatalog().getStatisticsJobScheduler();
+ Queue<StatisticsJob> statisticsJobs = jobScheduler.pendingJobQueue;
+
+ // check table unfinished job
+ for (StatisticsJob statisticsJob : statisticsJobs) {
+ JobState jobState = statisticsJob.getJobState();
+ List<Long> tableIdList = statisticsJob.getTableIds();
+ if (jobState == JobState.PENDING || jobState ==
JobState.SCHEDULING || jobState == JobState.RUNNING) {
+ for (Long tableId : tableIds) {
+ if (tableIdList.contains(tableId)) {
+ throw new DdlException("The same table(id=" + tableId
+ ") cannot have two unfinished statistics jobs.");
+ }
+ }
+ unfinishedJobs++;
+ }
+ }
+
+ // check the number of unfinished tasks
+ if (unfinishedJobs > Config.cbo_max_statistics_job_num) {
+ throw new DdlException("The unfinished statistics job could not
more then Config.cbo_max_statistics_job_num.");
+ }
}
- private void checkPermission() {
- // TODO
+ private void checkPermission(long dbId, Set<Long> tableIds, UserIdentity
userInfo) throws DdlException {
+ PaloAuth auth = Catalog.getCurrentCatalog().getAuth();
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+
+ // check the db permission
+ String dbName = db.getFullName();
+ boolean dbPermission = auth.checkDbPriv(userInfo, dbName,
PrivPredicate.SELECT);
+ if (!dbPermission) {
+ throw new DdlException("You do not have permissions to analyze the
database(" + dbName + ").");
Review comment:
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR
xxx
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
##########
@@ -18,56 +18,128 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.statistics.StatisticsJob.JobState;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
-/*
-For unified management of statistics job,
-including job addition, cancellation, scheduling, etc.
+/**
+ * For unified management of statistics job,
+ * including job addition, cancellation, scheduling, etc.
*/
public class StatisticsJobManager {
private static final Logger LOG =
LogManager.getLogger(StatisticsJobManager.class);
- // statistics job
- private Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
+ /**
+ * save statistics job status information
+ */
+ private final Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
- public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
+ public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+ return idToStatisticsJob;
+ }
+
+ public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws
DdlException {
// step0: init statistics job by analyzeStmt
StatisticsJob statisticsJob =
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
- // step1: get statistics to be analyzed
- Set<Long> tableIdList = statisticsJob.relatedTableId();
+
+ // step1: get statistical db&tbl to be analyzed
+ long dbId = statisticsJob.getDbId();
+ Set<Long> tableIds = statisticsJob.relatedTableId();
+
// step2: check restrict
- checkRestrict(tableIdList);
+ this.checkRestrict(dbId, tableIds);
+
// step3: check permission
- checkPermission();
+ UserIdentity userInfo = analyzeStmt.getUserInfo();
+ this.checkPermission(dbId, tableIds, userInfo);
+
// step4: create it
- createStatisticsJob(statisticsJob);
+ this.createStatisticsJob(statisticsJob);
}
public void createStatisticsJob(StatisticsJob statisticsJob) {
idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
try {
Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
} catch (IllegalStateException e) {
- LOG.info("The pending statistics job is full. Please submit it
again later.");
+ LOG.warn("The pending statistics job is full. Please submit it
again later.");
}
}
- // Rule1: The same table cannot have two unfinished statistics jobs
- // Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
- // Rule3: The job for external table is not supported
- private void checkRestrict(Set<Long> tableIdList) {
- // TODO
+ /**
+ * Rule1: The same table cannot have two unfinished statistics jobs
+ * Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
+ * Rule3: The job for external table is not supported
+ */
+ private void checkRestrict(long dbId, Set<Long> tableIds) throws
DdlException {
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+
+ // check table type
+ for (Long tableId : tableIds) {
+ Table table = db.getTableOrDdlException(tableId);
+ if (table.getType() != Table.TableType.OLAP) {
+ throw new DdlException("The external table(" + table.getName()
+ ") is not supported.");
+ }
+ }
+
+ int unfinishedJobs = 0;
+ StatisticsJobScheduler jobScheduler =
Catalog.getCurrentCatalog().getStatisticsJobScheduler();
Review comment:
Scheduler only contains PendingJobs instead of AllOfJobs.
The StatisticsJobManager really contains the AllOfJobs.
So, It better to obtain Jobs by StatisticsJobManager.
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java
##########
@@ -18,56 +18,128 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.statistics.StatisticsJob.JobState;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
-/*
-For unified management of statistics job,
-including job addition, cancellation, scheduling, etc.
+/**
+ * For unified management of statistics job,
+ * including job addition, cancellation, scheduling, etc.
*/
public class StatisticsJobManager {
private static final Logger LOG =
LogManager.getLogger(StatisticsJobManager.class);
- // statistics job
- private Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
+ /**
+ * save statistics job status information
+ */
+ private final Map<Long, StatisticsJob> idToStatisticsJob =
Maps.newConcurrentMap();
- public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
+ public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+ return idToStatisticsJob;
+ }
+
+ public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws
DdlException {
// step0: init statistics job by analyzeStmt
StatisticsJob statisticsJob =
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
- // step1: get statistics to be analyzed
- Set<Long> tableIdList = statisticsJob.relatedTableId();
+
+ // step1: get statistical db&tbl to be analyzed
+ long dbId = statisticsJob.getDbId();
+ Set<Long> tableIds = statisticsJob.relatedTableId();
+
// step2: check restrict
- checkRestrict(tableIdList);
+ this.checkRestrict(dbId, tableIds);
+
// step3: check permission
- checkPermission();
+ UserIdentity userInfo = analyzeStmt.getUserInfo();
+ this.checkPermission(dbId, tableIds, userInfo);
+
// step4: create it
- createStatisticsJob(statisticsJob);
+ this.createStatisticsJob(statisticsJob);
}
public void createStatisticsJob(StatisticsJob statisticsJob) {
idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
try {
Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
} catch (IllegalStateException e) {
- LOG.info("The pending statistics job is full. Please submit it
again later.");
+ LOG.warn("The pending statistics job is full. Please submit it
again later.");
}
}
- // Rule1: The same table cannot have two unfinished statistics jobs
- // Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
- // Rule3: The job for external table is not supported
- private void checkRestrict(Set<Long> tableIdList) {
- // TODO
+ /**
+ * Rule1: The same table cannot have two unfinished statistics jobs
+ * Rule2: The unfinished statistics job could not more then
Config.max_statistics_job_num
+ * Rule3: The job for external table is not supported
+ */
+ private void checkRestrict(long dbId, Set<Long> tableIds) throws
DdlException {
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+
+ // check table type
+ for (Long tableId : tableIds) {
+ Table table = db.getTableOrDdlException(tableId);
+ if (table.getType() != Table.TableType.OLAP) {
+ throw new DdlException("The external table(" + table.getName()
+ ") is not supported.");
+ }
+ }
+
+ int unfinishedJobs = 0;
+ StatisticsJobScheduler jobScheduler =
Catalog.getCurrentCatalog().getStatisticsJobScheduler();
+ Queue<StatisticsJob> statisticsJobs = jobScheduler.pendingJobQueue;
+
+ // check table unfinished job
+ for (StatisticsJob statisticsJob : statisticsJobs) {
+ JobState jobState = statisticsJob.getJobState();
+ List<Long> tableIdList = statisticsJob.getTableIds();
+ if (jobState == JobState.PENDING || jobState ==
JobState.SCHEDULING || jobState == JobState.RUNNING) {
+ for (Long tableId : tableIds) {
+ if (tableIdList.contains(tableId)) {
+ throw new DdlException("The same table(id=" + tableId
+ ") cannot have two unfinished statistics jobs.");
+ }
+ }
+ unfinishedJobs++;
+ }
+ }
+
+ // check the number of unfinished tasks
+ if (unfinishedJobs > Config.cbo_max_statistics_job_num) {
+ throw new DdlException("The unfinished statistics job could not
more then Config.cbo_max_statistics_job_num.");
+ }
}
- private void checkPermission() {
- // TODO
+ private void checkPermission(long dbId, Set<Long> tableIds, UserIdentity
userInfo) throws DdlException {
+ PaloAuth auth = Catalog.getCurrentCatalog().getAuth();
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+
+ // check the db permission
+ String dbName = db.getFullName();
+ boolean dbPermission = auth.checkDbPriv(userInfo, dbName,
PrivPredicate.SELECT);
+ if (!dbPermission) {
+ throw new DdlException("You do not have permissions to analyze the
database(" + dbName + ").");
+ }
+
+ // check the tables permission
+ for (Long tableId : tableIds) {
+ Table tbl = db.getTableOrDdlException(tableId);
+ boolean tblPermission = auth.checkTblPriv(userInfo, dbName,
tbl.getName(), PrivPredicate.SELECT);
Review comment:
Is it possible to check the table directly for permission verification?
Does having the select permission of db mean that it must have the select
permission of all tables in db?
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -40,40 +47,166 @@
SCHEDULING,
RUNNING,
FINISHED,
- CANCELLED
+ CANCELLED,
+ FAILED
}
- private long id = -1;
+ private final long id;
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final List<Long> tableIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * to be executed tasks.
+ */
+ private final List<StatisticsTask> tasks;
+
+ /**
+ * The progress of the job, it's equal to the number of completed tasks.
+ */
+ private int progress = 0;
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
- // end
- private List<StatisticsTask> taskList = Lists.newArrayList();
+ public StatisticsJob(Long dbId, List<Long> tableIdList, Map<Long,
List<String>> tableIdToColumnName) {
+ this.id = Catalog.getCurrentCatalog().getNextId();
+ this.dbId = dbId;
+ this.tableIds = tableIdList;
+ this.tableIdToColumnName = tableIdToColumnName;
+ this.tasks = Lists.newArrayList();
+ }
public long getId() {
return id;
}
- /*
- AnalyzeStmt: Analyze t1(c1), t2
- StatisticsJob:
- tableId [t1, t2]
- tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
- */
- public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
- // TODO
- return new StatisticsJob();
+ public long getDbId() {
+ return dbId;
+ }
+
+ public List<Long> getTableIds() {
+ return tableIds;
+ }
+
+ public Map<Long, List<String>> getTableIdToColumnName() {
+ return tableIdToColumnName;
+ }
+
+ public List<StatisticsTask> getTasks() {
+ return tasks;
+ }
+
+ public int getProgress() {
+ return progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ public JobState getJobState() {
+ return jobState;
+ }
+
+ public void setJobState(JobState jobState) {
+ this.jobState = jobState;
+ }
+
+ /**
+ * get statisticsJob from analyzeStmt.
Review comment:
```suggestion
* construct statisticsJob from analyzeStmt.
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -40,40 +47,166 @@
SCHEDULING,
RUNNING,
FINISHED,
- CANCELLED
+ CANCELLED,
+ FAILED
}
- private long id = -1;
+ private final long id;
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final List<Long> tableIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * to be executed tasks.
+ */
+ private final List<StatisticsTask> tasks;
+
+ /**
+ * The progress of the job, it's equal to the number of completed tasks.
+ */
+ private int progress = 0;
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
- // end
- private List<StatisticsTask> taskList = Lists.newArrayList();
+ public StatisticsJob(Long dbId, List<Long> tableIdList, Map<Long,
List<String>> tableIdToColumnName) {
+ this.id = Catalog.getCurrentCatalog().getNextId();
+ this.dbId = dbId;
+ this.tableIds = tableIdList;
+ this.tableIdToColumnName = tableIdToColumnName;
+ this.tasks = Lists.newArrayList();
+ }
public long getId() {
return id;
}
- /*
- AnalyzeStmt: Analyze t1(c1), t2
- StatisticsJob:
- tableId [t1, t2]
- tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
- */
- public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
- // TODO
- return new StatisticsJob();
+ public long getDbId() {
+ return dbId;
+ }
+
+ public List<Long> getTableIds() {
+ return tableIds;
+ }
+
+ public Map<Long, List<String>> getTableIdToColumnName() {
+ return tableIdToColumnName;
+ }
+
+ public List<StatisticsTask> getTasks() {
+ return tasks;
+ }
+
+ public int getProgress() {
+ return progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ public JobState getJobState() {
+ return jobState;
+ }
+
+ public void setJobState(JobState jobState) {
+ this.jobState = jobState;
+ }
+
+ /**
+ * get statisticsJob from analyzeStmt.
+ * AnalyzeStmt: analyze t1(c1,c2,c3)
+ * tableId: [t1]
+ * tableIdToColumnName <t1, [c1,c2,c3]>
+ */
+ public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt)
throws DdlException {
+ long dbId;
+ final List<Long> tableIdList = Lists.newArrayList();
+ final Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
+ final TableName dbTableName = analyzeStmt.getTableName();
+
+ // analyze table
Review comment:
This should be checked in AnalyzeStmt.analyze().
Here it just get db name and table name without analyzer
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -40,40 +47,166 @@
SCHEDULING,
RUNNING,
FINISHED,
- CANCELLED
+ CANCELLED,
+ FAILED
}
- private long id = -1;
+ private final long id;
Review comment:
```suggestion
private final long id = Catalog.getCurrentCatalog().getNextId();
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
##########
@@ -40,40 +47,166 @@
SCHEDULING,
RUNNING,
FINISHED,
- CANCELLED
+ CANCELLED,
+ FAILED
}
- private long id = -1;
+ private final long id;
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final List<Long> tableIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * to be executed tasks.
+ */
+ private final List<StatisticsTask> tasks;
Review comment:
```suggestion
private final List<StatisticsTask> tasks = Lists.newXXX;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]