This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new 9458cd8c782 Support show auto analyze pending jobs. (#31926)
9458cd8c782 is described below
commit 9458cd8c782f62b4302a43e0f371ff27d4ef8cf5
Author: Jibing-Li <[email protected]>
AuthorDate: Thu Mar 7 15:02:02 2024 +0800
Support show auto analyze pending jobs. (#31926)
---
fe/fe-core/src/main/cup/sql_parser.cup | 4 +
.../doris/analysis/ShowAutoAnalyzeJobsStmt.java | 210 +++++++++++++++++++++
.../java/org/apache/doris/qe/ShowExecutor.java | 34 ++++
.../apache/doris/statistics/AnalysisManager.java | 40 +++-
.../doris/statistics/AutoAnalysisPendingJob.java | 50 +++++
.../org/apache/doris/statistics/JobPriority.java | 24 +++
.../doris/statistics/StatisticsAutoCollector.java | 16 +-
.../doris/statistics/StatisticsJobAppender.java | 25 ++-
8 files changed, 383 insertions(+), 20 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 62924d098ac..0fffbe8a3f7 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -4369,6 +4369,10 @@ show_param ::=
{:
RESULT = new ShowAnalyzeStmt(tbl, parser.where, true);
:}
+ | KW_AUTO KW_JOBS opt_table_name:tbl opt_wild_where
+ {:
+ RESULT = new ShowAutoAnalyzeJobsStmt(tbl, parser.where);
+ :}
| KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId
{:
RESULT = new ShowAnalyzeTaskStatus(jobId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java
new file mode 100644
index 00000000000..560387fa5bc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.statistics.JobPriority;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * ShowAutoAnalyzeJobsStmt is used to show pending auto analysis jobs.
+ * syntax:
+ * SHOW AUTO ANALYZE JOBS
+ * [TABLE]
+ * [
+ * WHERE
+ * [PRIORITY = ["HIGH"|"MID"|"LOW"]]
+ * ]
+ */
+public class ShowAutoAnalyzeJobsStmt extends ShowStmt {
+ private static final String PRIORITY = "priority";
+ private static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
+ .add("catalog_name")
+ .add("db_name")
+ .add("tbl_name")
+ .add("col_list")
+ .add("priority")
+ .build();
+
+ private final TableName tableName;
+ private final Expr whereClause;
+
+ public ShowAutoAnalyzeJobsStmt(TableName tableName, Expr whereClause) {
+ this.tableName = tableName;
+ this.whereClause = whereClause;
+ }
+
+ // extract from predicate
+ private String jobPriority;
+
+ public String getPriority() {
+ Preconditions.checkArgument(isAnalyzed(),
+ "The stateValue must be obtained after the parsing is
complete");
+ return jobPriority;
+ }
+
+ public Expr getWhereClause() {
+ Preconditions.checkArgument(isAnalyzed(),
+ "The whereClause must be obtained after the parsing is
complete");
+ return whereClause;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ if (!ConnectContext.get().getSessionVariable().enableStats) {
+ throw new UserException("Analyze function is forbidden, you should
add `enable_stats=true`"
+ + "in your FE conf file");
+ }
+ super.analyze(analyzer);
+ if (tableName != null) {
+ tableName.analyze(analyzer);
+ String catalogName = tableName.getCtl();
+ String dbName = tableName.getDb();
+ String tblName = tableName.getTbl();
+ checkShowAnalyzePriv(catalogName, dbName, tblName);
+ }
+
+ // analyze where clause if not null
+ if (whereClause != null) {
+ analyzeSubPredicate(whereClause);
+ }
+ }
+
+ @Override
+ public ShowResultSetMetaData getMetaData() {
+ ShowResultSetMetaData.Builder builder =
ShowResultSetMetaData.builder();
+ for (String title : TITLE_NAMES) {
+ builder.addColumn(new Column(title,
ScalarType.createVarchar(128)));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.FORWARD_NO_SYNC;
+ }
+
+ private void checkShowAnalyzePriv(String catalogName, String dbName,
String tblName) throws AnalysisException {
+ if (!Env.getCurrentEnv().getAccessManager()
+ .checkTblPriv(ConnectContext.get(), catalogName, dbName,
tblName, PrivPredicate.SHOW)) {
+ ErrorReport.reportAnalysisException(
+ ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
+ "SHOW ANALYZE",
+ ConnectContext.get().getQualifiedUser(),
+ ConnectContext.get().getRemoteIP(),
+ dbName + ": " + tblName);
+ }
+ }
+
+ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {
+ if (subExpr == null) {
+ return;
+ }
+
+ boolean valid = true;
+
+ CHECK: {
+ if (subExpr instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) subExpr;
+ if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) {
+ valid = false;
+ break CHECK;
+ }
+ } else {
+ valid = false;
+ break CHECK;
+ }
+
+ // left child
+ if (!(subExpr.getChild(0) instanceof SlotRef)) {
+ valid = false;
+ break CHECK;
+ }
+ String leftKey = ((SlotRef) subExpr.getChild(0)).getColumnName();
+ if (!PRIORITY.equalsIgnoreCase(leftKey)) {
+ valid = false;
+ break CHECK;
+ }
+
+ // right child
+ if (!(subExpr.getChild(1) instanceof StringLiteral)) {
+ valid = false;
+ break CHECK;
+ }
+
+ String value = subExpr.getChild(1).getStringValue();
+ if (Strings.isNullOrEmpty(value)) {
+ valid = false;
+ break CHECK;
+ }
+
+ jobPriority = value.toUpperCase();
+ try {
+ JobPriority.valueOf(jobPriority);
+ } catch (Exception e) {
+ valid = false;
+ }
+ }
+
+ if (!valid) {
+ throw new AnalysisException("Where clause should looks like: "
+ + "PRIORITY = \"HIGH|MID|LOW\"");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SHOW AUTO ANALYZE");
+
+ if (tableName != null) {
+ sb.append(" ");
+ sb.append(tableName.toSql());
+ }
+
+ if (whereClause != null) {
+ sb.append(" ");
+ sb.append("WHERE");
+ sb.append(" ");
+ sb.append(whereClause.toSql());
+ }
+
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return toSql();
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 79a2dca8946..dda7045eabb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.analysis.ShowAlterStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.analysis.ShowAnalyzeTaskStatus;
import org.apache.doris.analysis.ShowAuthorStmt;
+import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt;
import org.apache.doris.analysis.ShowBackendsStmt;
import org.apache.doris.analysis.ShowBackupStmt;
import org.apache.doris.analysis.ShowBrokerStmt;
@@ -203,6 +204,7 @@ import org.apache.doris.mysql.privilege.PrivBitSet;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.AutoAnalysisPendingJob;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.ResultRow;
@@ -440,6 +442,8 @@ public class ShowExecutor {
handleShowCreateCatalog();
} else if (stmt instanceof ShowAnalyzeStmt) {
handleShowAnalyze();
+ } else if (stmt instanceof ShowAutoAnalyzeJobsStmt) {
+ handleShowAutoAnalyzePendingJobs();
} else if (stmt instanceof ShowTabletsBelongStmt) {
handleShowTabletsBelong();
} else if (stmt instanceof AdminCopyTabletStmt) {
@@ -2865,6 +2869,36 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
}
+ private void handleShowAutoAnalyzePendingJobs() {
+ ShowAutoAnalyzeJobsStmt showStmt = (ShowAutoAnalyzeJobsStmt) stmt;
+ List<AutoAnalysisPendingJob> jobs =
Env.getCurrentEnv().getAnalysisManager().showAutoPendingJobs(showStmt);
+ List<List<String>> resultRows = Lists.newArrayList();
+ for (AutoAnalysisPendingJob job : jobs) {
+ try {
+ List<String> row = new ArrayList<>();
+ CatalogIf<? extends DatabaseIf<? extends TableIf>> c
+ = StatisticsUtil.findCatalog(job.catalogName);
+ row.add(c.getName());
+ Optional<? extends DatabaseIf<? extends TableIf>> databaseIf =
c.getDb(job.dbName);
+ row.add(databaseIf.isPresent() ?
databaseIf.get().getFullName() : "DB may get deleted");
+ if (databaseIf.isPresent()) {
+ Optional<? extends TableIf> table =
databaseIf.get().getTable(job.tableName);
+ row.add(table.isPresent() ? table.get().getName() : "Table
may get deleted");
+ } else {
+ row.add("DB may get deleted");
+ }
+ row.add(job.getColumnNames());
+ row.add(String.valueOf(job.priority));
+ resultRows.add(row);
+ } catch (Exception e) {
+ LOG.warn("Failed to get pending jobs for table {}.{}.{},
reason: {}",
+ job.catalogName, job.dbName, job.tableName,
e.getMessage());
+ continue;
+ }
+ }
+ resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
+ }
+
private void handleShowTabletsBelong() {
ShowTabletsBelongStmt showStmt = (ShowTabletsBelongStmt) stmt;
List<List<String>> rows = new ArrayList<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index ed66eae0455..ad740e5283d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.DropAnalyzeJobStmt;
import org.apache.doris.analysis.DropStatsStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
+import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
@@ -113,9 +114,9 @@ public class AnalysisManager implements Writable {
private static final int COLUMN_QUEUE_SIZE = 1000;
public final Queue<HighPriorityColumn> highPriorityColumns = new
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Queue<HighPriorityColumn> midPriorityColumns = new
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
- public final Map<TableIf, Set<String>> highPriorityJobs = new
LinkedHashMap<>();
- public final Map<TableIf, Set<String>> midPriorityJobs = new
LinkedHashMap<>();
- public final Map<TableIf, Set<String>> lowPriorityJobs = new
LinkedHashMap<>();
+ public final Map<TableName, Set<String>> highPriorityJobs = new
LinkedHashMap<>();
+ public final Map<TableName, Set<String>> midPriorityJobs = new
LinkedHashMap<>();
+ public final Map<TableName, Set<String>> lowPriorityJobs = new
LinkedHashMap<>();
// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>>
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -595,6 +596,39 @@ public class AnalysisManager implements Writable {
}
}
+ public List<AutoAnalysisPendingJob>
showAutoPendingJobs(ShowAutoAnalyzeJobsStmt stmt) {
+ TableName tblName = stmt.getTableName();
+ String priority = stmt.getPriority();
+ List<AutoAnalysisPendingJob> result = Lists.newArrayList();
+ if (priority == null || priority.isEmpty()) {
+ result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH,
tblName));
+ result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID,
tblName));
+ result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW,
tblName));
+ } else if (priority.equals(JobPriority.HIGH.name())) {
+ result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH,
tblName));
+ } else if (priority.equals(JobPriority.MID.name())) {
+ result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID,
tblName));
+ } else if (priority.equals(JobPriority.LOW.name())) {
+ result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW,
tblName));
+ }
+ return result;
+ }
+
+ protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName,
Set<String>> jobMap,
+ JobPriority priority, TableName tblName) {
+ List<AutoAnalysisPendingJob> result = Lists.newArrayList();
+ synchronized (jobMap) {
+ for (Entry<TableName, Set<String>> entry : jobMap.entrySet()) {
+ TableName table = entry.getKey();
+ if (tblName == null || tblName.equals(table)) {
+ result.add(new AutoAnalysisPendingJob(table.getCtl(),
+ table.getDb(), table.getTbl(), entry.getValue(),
priority));
+ }
+ }
+ }
+ return result;
+ }
+
public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java
new file mode 100644
index 00000000000..ddd06d17c81
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import java.util.Set;
+import java.util.StringJoiner;
+
+public class AutoAnalysisPendingJob {
+
+ public final String catalogName;
+ public final String dbName;
+ public final String tableName;
+ public final Set<String> columnNames;
+ public final JobPriority priority;
+
+ public AutoAnalysisPendingJob(String catalogName, String dbName, String
tableName,
+ Set<String> columnNames, JobPriority priority) {
+ this.catalogName = catalogName;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.columnNames = columnNames;
+ this.priority = priority;
+ }
+
+ public String getColumnNames() {
+ if (columnNames == null) {
+ return "";
+ }
+ StringJoiner stringJoiner = new StringJoiner(",");
+ for (String colName : columnNames) {
+ stringJoiner.add(colName);
+ }
+ return stringJoiner.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
new file mode 100644
index 00000000000..2786b063563
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+public enum JobPriority {
+ HIGH,
+ MID,
+ LOW;
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index e0df94b5cb0..227074dbb5c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -17,6 +17,7 @@
package org.apache.doris.statistics;
+import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
@@ -57,13 +58,14 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
@Override
protected void collect() {
while (canCollect()) {
- Map.Entry<TableIf, Set<String>> job = getJob();
+ Map.Entry<TableName, Set<String>> job = getJob();
if (job == null) {
// No more job to process, break and sleep.
break;
}
try {
- TableIf table = job.getKey();
+ TableName tblName = job.getKey();
+ TableIf table = StatisticsUtil.findTable(tblName.getCtl(),
tblName.getDb(), tblName.getTbl());
if (!supportAutoAnalyze(table)) {
continue;
}
@@ -78,7 +80,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
processOneJob(table, columns);
} catch (Exception e) {
LOG.warn("Failed to analyze table {} with columns [{}]",
- job.getKey().getName(),
job.getValue().stream().collect(Collectors.joining(",")), e);
+ job.getKey().getTbl(),
job.getValue().stream().collect(Collectors.joining(",")), e);
}
}
}
@@ -88,9 +90,9 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
&&
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}
- protected Map.Entry<TableIf, Set<String>> getJob() {
+ protected Map.Entry<TableName, Set<String>> getJob() {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
- Optional<Map.Entry<TableIf, Set<String>>> job =
fetchJobFromMap(manager.highPriorityJobs);
+ Optional<Map.Entry<TableName, Set<String>>> job =
fetchJobFromMap(manager.highPriorityJobs);
if (job.isPresent()) {
return job.get();
}
@@ -102,9 +104,9 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
return job.isPresent() ? job.get() : null;
}
- protected Optional<Map.Entry<TableIf, Set<String>>>
fetchJobFromMap(Map<TableIf, Set<String>> jobMap) {
+ protected Optional<Map.Entry<TableName, Set<String>>>
fetchJobFromMap(Map<TableName, Set<String>> jobMap) {
synchronized (jobMap) {
- Optional<Map.Entry<TableIf, Set<String>>> first =
jobMap.entrySet().stream().findFirst();
+ Optional<Map.Entry<TableName, Set<String>>> first =
jobMap.entrySet().stream().findFirst();
first.ifPresent(entry -> jobMap.remove(entry.getKey()));
return first;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
index 71bb71d3cda..93d03a3fdb8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -17,6 +17,7 @@
package org.apache.doris.statistics;
+import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@@ -81,31 +82,33 @@ public class StatisticsJobAppender extends MasterDaemon {
}
}
- protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue,
Map<TableIf, Set<String>> jobsMap) {
+ protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue,
Map<TableName, Set<String>> jobsMap) {
int size = columnQueue.size();
for (int i = 0; i < size; i++) {
HighPriorityColumn column = columnQueue.poll();
LOG.info("Process column " + column.tblId + "." + column.colName);
TableIf table = StatisticsUtil.findTable(column.catalogId,
column.dbId, column.tblId);
+ TableName tableName = new
TableName(table.getDatabase().getCatalog().getName(),
+ table.getDatabase().getFullName(), table.getName());
synchronized (jobsMap) {
// If job map reach the upper limit, stop putting new jobs.
- if (!jobsMap.containsKey(table) && jobsMap.size() >=
JOB_MAP_SIZE) {
+ if (!jobsMap.containsKey(tableName) && jobsMap.size() >=
JOB_MAP_SIZE) {
LOG.info("Job map full.");
break;
}
- if (jobsMap.containsKey(table)) {
- jobsMap.get(table).add(column.colName);
+ if (jobsMap.containsKey(tableName)) {
+ jobsMap.get(tableName).add(column.colName);
} else {
HashSet<String> columns = new HashSet<>();
columns.add(column.colName);
- jobsMap.put(table, columns);
+ jobsMap.put(tableName, columns);
}
LOG.info("Column " + column.tblId + "." + column.colName + "
added");
}
}
}
- protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) {
+ protected void appendToLowQueue(Map<TableName, Set<String>> jobsMap) {
InternalCatalog catalog = Env.getCurrentInternalCatalog();
List<Long> sortedDbs =
catalog.getDbIds().stream().sorted().collect(Collectors.toList());
int batchSize = 100;
@@ -122,18 +125,20 @@ public class StatisticsJobAppender extends MasterDaemon {
if (!(t instanceof OlapTable) || t.getId() <= currentTableId) {
continue;
}
+ TableName tableName = new
TableName(t.getDatabase().getCatalog().getName(),
+ t.getDatabase().getFullName(), t.getName());
synchronized (jobsMap) {
// If job map reach the upper limit, stop adding new jobs.
- if (!jobsMap.containsKey(t) && jobsMap.size() >=
JOB_MAP_SIZE) {
+ if (!jobsMap.containsKey(tableName) && jobsMap.size() >=
JOB_MAP_SIZE) {
return;
}
Set<String> columns
= t.getColumns().stream().filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
.map(c -> c.getName()).collect(Collectors.toSet());
- if (jobsMap.containsKey(t)) {
- jobsMap.get(t).addAll(columns);
+ if (jobsMap.containsKey(tableName)) {
+ jobsMap.get(tableName).addAll(columns);
} else {
- jobsMap.put(t, columns);
+ jobsMap.put(tableName, columns);
}
}
currentTableId = t.getId();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]