This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 25da0567ae5 [improvement](statistics) Optimize drop stats operation
(#30144)
25da0567ae5 is described below
commit 25da0567ae5311258c2969e17f5f7dbf5df5e037
Author: Jibing-Li <[email protected]>
AuthorDate: Mon Jan 22 13:20:34 2024 +0800
[improvement](statistics) Optimize drop stats operation (#30144)
Before, drop stats operation need to call columns * followers times of
isMaster() function and the same times of rpc to drop remote column stats. This
pr is to reduce the rpc calls and use more efficient way to check master node
instead of using isMaster()
---
.../org/apache/doris/analysis/DropStatsStmt.java | 19 ++++++
.../apache/doris/service/FrontendServiceImpl.java | 12 +++-
.../apache/doris/statistics/AnalysisManager.java | 70 ++++++++++++++++------
.../doris/statistics/InvalidateStatsTarget.java | 48 +++++++++++++++
.../apache/doris/statistics/StatisticsCache.java | 17 +-----
5 files changed, 133 insertions(+), 33 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java
index ac08f01f31e..5e3bd20c0f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java
@@ -53,7 +53,10 @@ public class DropStatsStmt extends DdlStmt {
private Set<String> columnNames;
// Flag to drop external table row count in table_statistics.
private boolean dropTableRowCount;
+ private boolean isAllColumns;
+ private long catalogId;
+ private long dbId;
private long tblId;
public DropStatsStmt(boolean dropExpired) {
@@ -100,10 +103,13 @@ public class DropStatsStmt extends DdlStmt {
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName);
tblId = table.getId();
+ dbId = db.getId();
+ catalogId = catalog.getId();
// check permission
checkAnalyzePriv(db.getFullName(), table.getName());
// check columnNames
if (columnNames != null) {
+ isAllColumns = false;
for (String cName : columnNames) {
if (table.getColumn(cName) == null) {
ErrorReport.reportAnalysisException(
@@ -115,6 +121,7 @@ public class DropStatsStmt extends DdlStmt {
}
}
} else {
+ isAllColumns = true;
columnNames =
table.getColumns().stream().map(Column::getName).collect(Collectors.toSet());
}
}
@@ -123,10 +130,22 @@ public class DropStatsStmt extends DdlStmt {
return tblId;
}
+ public long getDbId() {
+ return dbId;
+ }
+
+ public long getCatalogIdId() {
+ return catalogId;
+ }
+
public Set<String> getColumnNames() {
return columnNames;
}
+ public boolean isAllColumns() {
+ return isAllColumns;
+ }
+
public boolean dropTableRowCount() {
return dropTableRowCount;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a9ae485d8f9..f9e3372bf28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -95,9 +95,12 @@ import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
+import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.InvalidateStatsTarget;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticsCacheKey;
+import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@@ -3041,8 +3044,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
@Override
public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest
request) throws TException {
- StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key,
StatisticsCacheKey.class);
- Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId,
k.idxId, k.colName);
+ InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key,
InvalidateStatsTarget.class);
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ TableStatsMeta tableStats =
analysisManager.findTableStatsStatus(target.tableId);
+ if (tableStats == null) {
+ return new TStatus(TStatusCode.OK);
+ }
+ analysisManager.invalidateLocalStats(target.catalogId, target.dbId,
target.tableId, target.columns, tableStats);
return new TStatus(TStatusCode.OK);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 8f927694dc7..fe64fb14142 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -58,6 +58,9 @@ import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -646,23 +649,16 @@ public class AnalysisManager implements Writable {
}
Set<String> cols = dropStatsStmt.getColumnNames();
+ long catalogId = dropStatsStmt.getCatalogIdId();
+ long dbId = dropStatsStmt.getDbId();
long tblId = dropStatsStmt.getTblId();
TableStatsMeta tableStats =
findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
- if (cols == null) {
- tableStats.reset();
- } else {
- dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn);
- StatisticsCache statisticsCache =
Env.getCurrentEnv().getStatisticsCache();
- for (String col : cols) {
- statisticsCache.syncInvalidate(tblId, -1L, col);
- }
- tableStats.updatedTime = 0;
- }
- tableStats.userInjected = false;
- logCreateTableStats(tableStats);
+ invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats);
+ // Drop stats ddl is master only operation.
+ invalidateRemoteStats(catalogId, dbId, tblId, cols,
dropStatsStmt.isAllColumns());
StatisticsRepository.dropStatistics(tblId, cols);
}
@@ -671,15 +667,55 @@ public class AnalysisManager implements Writable {
if (tableStats == null) {
return;
}
+ long catalogId = table.getDatabase().getCatalog().getId();
+ long dbId = table.getDatabase().getId();
+ long tableId = table.getId();
Set<String> cols =
table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
+ invalidateLocalStats(catalogId, dbId, tableId, cols, tableStats);
+ // Drop stats ddl is master only operation.
+ invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
+ StatisticsRepository.dropStatistics(table.getId(), cols);
+ }
+
+ public void invalidateLocalStats(long catalogId, long dbId, long tableId,
+ Set<String> columns, TableStatsMeta
tableStats) {
+ if (tableStats == null) {
+ return;
+ }
StatisticsCache statisticsCache =
Env.getCurrentEnv().getStatisticsCache();
- for (String col : cols) {
- tableStats.removeColumn(col);
- statisticsCache.syncInvalidate(table.getId(), -1L, col);
+ if (columns == null) {
+ TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
+ columns =
table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
+ }
+ for (String column : columns) {
+ tableStats.removeColumn(column);
+ statisticsCache.invalidate(tableId, -1, column);
}
tableStats.updatedTime = 0;
- logCreateTableStats(tableStats);
- StatisticsRepository.dropStatistics(table.getId(), cols);
+ tableStats.userInjected = false;
+ }
+
+ public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
+ Set<String> columns, boolean
isAllColumns) {
+ InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId,
dbId, tableId, columns, isAllColumns);
+ TInvalidateFollowerStatsCacheRequest request = new
TInvalidateFollowerStatsCacheRequest();
+ request.key = GsonUtils.GSON.toJson(target);
+ StatisticsCache statisticsCache =
Env.getCurrentEnv().getStatisticsCache();
+ SystemInfoService.HostInfo selfNode =
Env.getCurrentEnv().getSelfNode();
+ boolean success = true;
+ for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
+ // Skip master
+ if (selfNode.equals(frontend.getHost())) {
+ continue;
+ }
+ success = success && statisticsCache.invalidateStats(frontend,
request);
+ }
+ if (!success) {
+ // If any rpc failed, use edit log to sync table stats to
non-master FEs.
+ LOG.warn("Failed to invalidate all remote stats by rpc for table
{}, use edit log.", tableId);
+ TableStatsMeta tableStats = findTableStatsStatus(tableId);
+ logCreateTableStats(tableStats);
+ }
}
public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt)
throws DdlException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
new file mode 100644
index 00000000000..e49048f8946
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Set;
+
+public class InvalidateStatsTarget {
+
+ @SerializedName("catalogId")
+ public final long catalogId;
+
+ @SerializedName("dbId")
+ public final long dbId;
+
+ @SerializedName("tableId")
+ public final long tableId;
+
+ @SerializedName("columns")
+ public final Set<String> columns;
+
+ public InvalidateStatsTarget(long catalogId, long dbId, long tableId,
Set<String> columns, boolean isAllColumns) {
+ this.catalogId = catalogId;
+ this.dbId = dbId;
+ this.tableId = tableId;
+ if (isAllColumns) {
+ this.columns = null;
+ } else {
+ this.columns = columns;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index fbec9a60fa0..0cf2808222e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -137,19 +137,6 @@ public class StatisticsCache {
columnStatisticsCache.synchronous().invalidate(new
StatisticsCacheKey(tblId, idxId, colName));
}
- public void syncInvalidate(long tblId, long idxId, String colName) {
- StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId,
colName);
- columnStatisticsCache.synchronous().invalidate(cacheKey);
- TInvalidateFollowerStatsCacheRequest request = new
TInvalidateFollowerStatsCacheRequest();
- request.key = GsonUtils.GSON.toJson(cacheKey);
- for (Frontend frontend :
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
- if (StatisticsUtil.isMaster(frontend)) {
- continue;
- }
- invalidateStats(frontend, request);
- }
- }
-
public void updateColStatsCache(long tblId, long idxId, String colName,
ColumnStatistic statistic) {
columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId,
idxId, colName), Optional.of(statistic));
}
@@ -261,7 +248,7 @@ public class StatisticsCache {
}
@VisibleForTesting
- public void invalidateStats(Frontend frontend,
TInvalidateFollowerStatsCacheRequest request) {
+ public boolean invalidateStats(Frontend frontend,
TInvalidateFollowerStatsCacheRequest request) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(),
frontend.getRpcPort());
FrontendService.Client client = null;
try {
@@ -269,11 +256,13 @@ public class StatisticsCache {
client.invalidateStatsCache(request);
} catch (Throwable t) {
LOG.warn("Failed to sync invalidate to follower: {}", address, t);
+ return false;
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
+ return true;
}
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]