EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854742859
##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
*/
public class StatisticsJob {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJob.class);
public enum JobState {
PENDING,
SCHEDULING,
RUNNING,
FINISHED,
+ FAILED,
CANCELLED
}
- private long id = -1;
+ private long id = Catalog.getCurrentCatalog().getNextId();
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final Set<Long> tblIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * timeout of a statistics task
+ */
+ private long taskTimeout;
+
+ /**
+ * to be executed tasks.
+ */
+ private List<StatisticsTask> tasks = Lists.newArrayList();
+
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
- // end
+ private final List<String> errorMsgs = Lists.newArrayList();
- private List<StatisticsTask> taskList = Lists.newArrayList();
+ private final long createTime = System.currentTimeMillis();
+ private long startTime = -1L;
+ private long finishTime = -1L;
+ private int progress = 0;
+
+ public StatisticsJob(Long dbId,
+ Set<Long> tblIds,
+ Map<Long, List<String>> tableIdToColumnName) {
+ this.dbId = dbId;
+ this.tblIds = tblIds;
+ this.tableIdToColumnName = tableIdToColumnName;
+ }
public long getId() {
- return id;
+ return this.id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public long getDbId() {
+ return this.dbId;
+ }
+
+ public Set<Long> getTblIds() {
+ return this.tblIds;
+ }
+
+ public Map<Long, List<String>> getTableIdToColumnName() {
+ return this.tableIdToColumnName;
+ }
+
+ public long getTaskTimeout() {
+ return taskTimeout;
}
- /*
- AnalyzeStmt: Analyze t1(c1), t2
- StatisticsJob:
- tableId [t1, t2]
- tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
- */
- public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
- // TODO
- return new StatisticsJob();
+ public List<StatisticsTask> getTasks() {
+ return this.tasks;
+ }
+
+ public void setTasks(List<StatisticsTask> tasks) {
+ this.tasks = tasks;
+ }
+
+ public List<String> getErrorMsgs() {
+ return errorMsgs;
+ }
+
+ public JobState getJobState() {
+ return this.jobState;
+ }
+
+ public long getCreateTime() {
+ return this.createTime;
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public int getProgress() {
+ return this.progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ private void setOptional(AnalyzeStmt stmt) {
+ if (stmt.getTaskTimeout() != -1) {
+ this.taskTimeout = stmt.getTaskTimeout();
+ }
+ }
+
+ public synchronized void updateJobState(JobState jobState) {
+ // PENDING -> SCHEDULING/FAILED/CANCELLED
+ if (this.jobState == JobState.PENDING) {
Review Comment:
Also it should keep job lock when the state is switched .
##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +47,211 @@
Switch job state from pending to scheduling.
*/
public class StatisticsJobScheduler extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJobScheduler.class);
+
+ /**
+ * If the table row-count is greater than the maximum number of Be scans
for a single BE,
+ * we'll divide subtasks by partition. relevant
values(3700000000L&600000000L) are derived from test.
+ * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is
for min(c1)/max(c1)/ndv(c1).
+ */
+ private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+ private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
- public Queue<StatisticsJob> pendingJobQueue =
Queues.newLinkedBlockingQueue();
+ /**
+ * Different statistics need to be collected for the jobs submitted by
users.
+ * if all statistics be collected at the same time, the cluster may be
overburdened
+ * and normal query services may be affected. Therefore, we put the jobs
into the queue
+ * and schedule them one by one, and finally divide each job to several
subtasks and execute them.
+ */
+ public final Queue<StatisticsJob> pendingJobQueue =
Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
public StatisticsJobScheduler() {
super("Statistics job scheduler", 0);
}
@Override
protected void runAfterCatalogReady() {
- // TODO
- StatisticsJob pendingJob = pendingJobQueue.peek();
- // step0: check job state again
- // step1: divide statistics job to task
- List<StatisticsTask> statisticsTaskList = divide(pendingJob);
- // step2: submit
-
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+ StatisticsJob pendingJob = this.pendingJobQueue.poll();
+ if (pendingJob != null) {
+ try {
+ List<StatisticsTask> tasks = this.divide(pendingJob);
Review Comment:
Put tasks into StatisticsJob at the same time
##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
*/
public class StatisticsJob {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJob.class);
public enum JobState {
PENDING,
SCHEDULING,
RUNNING,
FINISHED,
+ FAILED,
CANCELLED
}
- private long id = -1;
+ private long id = Catalog.getCurrentCatalog().getNextId();
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final Set<Long> tblIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * timeout of a statistics task
+ */
+ private long taskTimeout;
+
+ /**
+ * to be executed tasks.
+ */
+ private List<StatisticsTask> tasks = Lists.newArrayList();
+
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
- // end
+ private final List<String> errorMsgs = Lists.newArrayList();
- private List<StatisticsTask> taskList = Lists.newArrayList();
+ private final long createTime = System.currentTimeMillis();
+ private long startTime = -1L;
+ private long finishTime = -1L;
+ private int progress = 0;
+
+ public StatisticsJob(Long dbId,
+ Set<Long> tblIds,
+ Map<Long, List<String>> tableIdToColumnName) {
+ this.dbId = dbId;
+ this.tblIds = tblIds;
+ this.tableIdToColumnName = tableIdToColumnName;
+ }
public long getId() {
- return id;
+ return this.id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public long getDbId() {
+ return this.dbId;
+ }
+
+ public Set<Long> getTblIds() {
+ return this.tblIds;
+ }
+
+ public Map<Long, List<String>> getTableIdToColumnName() {
+ return this.tableIdToColumnName;
+ }
+
+ public long getTaskTimeout() {
+ return taskTimeout;
}
- /*
- AnalyzeStmt: Analyze t1(c1), t2
- StatisticsJob:
- tableId [t1, t2]
- tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
- */
- public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
- // TODO
- return new StatisticsJob();
+ public List<StatisticsTask> getTasks() {
+ return this.tasks;
+ }
+
+ public void setTasks(List<StatisticsTask> tasks) {
+ this.tasks = tasks;
+ }
+
+ public List<String> getErrorMsgs() {
+ return errorMsgs;
+ }
+
+ public JobState getJobState() {
+ return this.jobState;
+ }
+
+ public long getCreateTime() {
+ return this.createTime;
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public int getProgress() {
+ return this.progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ private void setOptional(AnalyzeStmt stmt) {
+ if (stmt.getTaskTimeout() != -1) {
+ this.taskTimeout = stmt.getTaskTimeout();
+ }
+ }
+
+ public synchronized void updateJobState(JobState jobState) {
+ // PENDING -> SCHEDULING/FAILED/CANCELLED
Review Comment:
If there is a invalid state switch, it will thrown exception
##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
*/
public class StatisticsJob {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJob.class);
public enum JobState {
PENDING,
SCHEDULING,
RUNNING,
FINISHED,
+ FAILED,
CANCELLED
}
- private long id = -1;
+ private long id = Catalog.getCurrentCatalog().getNextId();
+
+ /**
+ * to be collected database stats.
+ */
+ private final long dbId;
+
+ /**
+ * to be collected table stats.
+ */
+ private final Set<Long> tblIds;
+
+ /**
+ * to be collected column stats.
+ */
+ private final Map<Long, List<String>> tableIdToColumnName;
+
+ /**
+ * timeout of a statistics task
+ */
+ private long taskTimeout;
+
+ /**
+ * to be executed tasks.
+ */
+ private List<StatisticsTask> tasks = Lists.newArrayList();
+
private JobState jobState = JobState.PENDING;
- // optional
- // to be collected table stats
- private List<Long> tableId = Lists.newArrayList();
- // to be collected column stats
- private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
- private Map<String, String> properties;
- // end
+ private final List<String> errorMsgs = Lists.newArrayList();
- private List<StatisticsTask> taskList = Lists.newArrayList();
+ private final long createTime = System.currentTimeMillis();
+ private long startTime = -1L;
+ private long finishTime = -1L;
+ private int progress = 0;
+
+ public StatisticsJob(Long dbId,
+ Set<Long> tblIds,
+ Map<Long, List<String>> tableIdToColumnName) {
+ this.dbId = dbId;
+ this.tblIds = tblIds;
+ this.tableIdToColumnName = tableIdToColumnName;
+ }
public long getId() {
- return id;
+ return this.id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public long getDbId() {
+ return this.dbId;
+ }
+
+ public Set<Long> getTblIds() {
+ return this.tblIds;
+ }
+
+ public Map<Long, List<String>> getTableIdToColumnName() {
+ return this.tableIdToColumnName;
+ }
+
+ public long getTaskTimeout() {
+ return taskTimeout;
}
- /*
- AnalyzeStmt: Analyze t1(c1), t2
- StatisticsJob:
- tableId [t1, t2]
- tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
- */
- public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
- // TODO
- return new StatisticsJob();
+ public List<StatisticsTask> getTasks() {
+ return this.tasks;
+ }
+
+ public void setTasks(List<StatisticsTask> tasks) {
+ this.tasks = tasks;
+ }
+
+ public List<String> getErrorMsgs() {
+ return errorMsgs;
+ }
+
+ public JobState getJobState() {
+ return this.jobState;
+ }
+
+ public long getCreateTime() {
+ return this.createTime;
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public int getProgress() {
+ return this.progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ private void setOptional(AnalyzeStmt stmt) {
+ if (stmt.getTaskTimeout() != -1) {
+ this.taskTimeout = stmt.getTaskTimeout();
+ }
+ }
+
+ public synchronized void updateJobState(JobState jobState) {
+ // PENDING -> SCHEDULING/FAILED/CANCELLED
+ if (this.jobState == JobState.PENDING) {
+ if (jobState == JobState.SCHEDULING) {
+ this.jobState = JobState.SCHEDULING;
+ } else if (jobState == JobState.FAILED) {
+ this.jobState = JobState.FAILED;
+ } else if (jobState == JobState.CANCELLED) {
+ this.jobState = JobState.CANCELLED;
+ }
+ return;
+ }
+
+ // SCHEDULING -> RUNNING/FAILED/CANCELLED
+ if (this.jobState == JobState.SCHEDULING) {
+ if (jobState == JobState.RUNNING) {
+ this.jobState = JobState.RUNNING;
+ } else if (jobState == JobState.FAILED) {
+ this.jobState = JobState.FAILED;
+ } else if (jobState == JobState.CANCELLED) {
+ this.jobState = JobState.CANCELLED;
+ }
+ return;
+ }
+
+ // RUNNING -> FINISHED/FAILED/CANCELLED
+ if (this.jobState == JobState.RUNNING) {
+ if (jobState == JobState.FINISHED) {
+ this.jobState = JobState.FINISHED;
+ } else if (jobState == JobState.FAILED) {
+ this.jobState = JobState.FAILED;
+ } else if (jobState == JobState.CANCELLED) {
+ this.jobState = JobState.CANCELLED;
+ }
+ }
Review Comment:
Add a LOG in here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]