This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new 14f606fbc97 Support follower sync query columns to master. (#31859)
14f606fbc97 is described below
commit 14f606fbc977009123d30d9ff244fcea2a75e993
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Mar 6 17:05:21 2024 +0800
Support follower sync query columns to master. (#31859)
---
.../main/java/org/apache/doris/catalog/Env.java | 8 ++
.../apache/doris/service/FrontendServiceImpl.java | 8 ++
.../apache/doris/statistics/AnalysisManager.java | 15 +++
.../doris/statistics/FollowerColumnSender.java | 120 +++++++++++++++++++++
.../doris/statistics/HighPriorityColumn.java | 11 ++
.../doris/statistics/StatisticsAutoCollector.java | 21 +++-
gensrc/thrift/FrontendService.thrift | 13 +++
7 files changed, 194 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 823b7f18fda..3f34a6522dc 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -240,6 +240,7 @@ import
org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.FollowerColumnSender;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
@@ -519,6 +520,8 @@ public class Env {
private StatisticsJobAppender statisticsJobAppender;
+ private FollowerColumnSender followerColumnSender;
+
private HiveTransactionMgr hiveTransactionMgr;
private TopicPublisherThread topicPublisherThread;
@@ -1719,6 +1722,11 @@ public class Env {
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
+
+ if (followerColumnSender == null) {
+ followerColumnSender = new FollowerColumnSender();
+ followerColumnSender.start();
+ }
}
// Set global variable 'lower_case_table_names' only when the cluster is
initialized.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f450af207bd..b9d74762d24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -224,6 +224,7 @@ import
org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TStringLiteral;
+import org.apache.doris.thrift.TSyncQueryColumns;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
import org.apache.doris.thrift.TTableQueryStats;
@@ -3678,4 +3679,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ @Override
+ public TStatus syncQueryColumns(TSyncQueryColumns request) throws
TException {
+
Env.getCurrentEnv().getAnalysisManager().mergeFollowerQueryColumns(request.highPriorityColumns,
+ request.midPriorityColumns);
+ return new TStatus(TStatusCode.OK);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0ff9e3a9e00..ed66eae0455 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -64,6 +64,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
+import org.apache.doris.thrift.TQueryColumn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -1176,4 +1177,18 @@ public class AnalysisManager implements Writable {
}
}
}
+
+ public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
+ Collection<TQueryColumn> midColumns) {
+ for (TQueryColumn c : highColumns) {
+ if (!highPriorityColumns.offer(new HighPriorityColumn(c.catalogId,
c.dbId, c.tblId, c.colName))) {
+ break;
+ }
+ }
+ for (TQueryColumn c : midColumns) {
+ if (!midPriorityColumns.offer(new HighPriorityColumn(c.catalogId,
c.dbId, c.tblId, c.colName))) {
+ break;
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
new file mode 100644
index 00000000000..181000c1ef2
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryColumn;
+import org.apache.doris.thrift.TSyncQueryColumns;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FollowerColumnSender extends MasterDaemon {
+
+ private static final Logger LOG =
LogManager.getLogger(FollowerColumnSender.class);
+
+ public static final long INTERVAL = 5000;
+
+ public FollowerColumnSender() {
+ super("Follower Column Sender", INTERVAL);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!StatisticsUtil.enableAutoAnalyze()) {
+ return;
+ }
+ if (Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+ if (Env.isCheckpointThread()) {
+ return;
+ }
+ send();
+ }
+
+ protected void send() {
+ if (Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+ Env currentEnv = Env.getCurrentEnv();
+ AnalysisManager analysisManager = currentEnv.getAnalysisManager();
+ if (analysisManager.highPriorityColumns.isEmpty() &&
analysisManager.midPriorityColumns.isEmpty()) {
+ return;
+ }
+ List<TQueryColumn> highPriorityColumns
+ = analysisManager.highPriorityColumns
+ .stream()
+ .map(HighPriorityColumn::toThrift)
+ .collect(Collectors.toList());
+ List<TQueryColumn> midPriorityColumns
+ = analysisManager.midPriorityColumns
+ .stream()
+ .map(HighPriorityColumn::toThrift)
+ .collect(Collectors.toList());
+ analysisManager.highPriorityColumns.clear();
+ analysisManager.midPriorityColumns.clear();
+ TSyncQueryColumns queryColumns = new TSyncQueryColumns();
+ queryColumns.highPriorityColumns = highPriorityColumns;
+ queryColumns.midPriorityColumns = midPriorityColumns;
+ Frontend master = null;
+ try {
+ InetSocketAddress masterAddress =
currentEnv.getHaProtocol().getLeader();
+ for (Frontend fe :
currentEnv.getFrontends(FrontendNodeType.FOLLOWER)) {
+ InetSocketAddress socketAddress = new
InetSocketAddress(fe.getHost(), fe.getEditLogPort());
+ if (socketAddress.equals(masterAddress)) {
+ master = fe;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to find master FE.", e);
+ return;
+ }
+
+ if (master == null) {
+ LOG.warn("No master found in cluster.");
+ return;
+ }
+ TNetworkAddress address = new TNetworkAddress(master.getHost(),
master.getRpcPort());
+ FrontendService.Client client = null;
+ try {
+ client = ClientPool.frontendPool.borrowObject(address);
+ client.syncQueryColumns(queryColumns);
+ LOG.info("Send {} high priority columns and {} mid priority
columns to master.",
+ highPriorityColumns.size(), midPriorityColumns.size());
+ } catch (Throwable t) {
+ LOG.warn("Failed to sync stats to master: {}", address, t);
+ } finally {
+ if (client != null) {
+ ClientPool.frontendPool.returnObject(address, client);
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
index c4bc20c399a..b2292ef725d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
@@ -17,6 +17,8 @@
package org.apache.doris.statistics;
+import org.apache.doris.thrift.TQueryColumn;
+
import java.util.Objects;
public class HighPriorityColumn {
@@ -52,4 +54,13 @@ public class HighPriorityColumn {
&& this.tblId == otherCriticalColumn.tblId
&& this.colName.equals(otherCriticalColumn.colName);
}
+
+ public TQueryColumn toThrift() {
+ TQueryColumn tQueryColumn = new TQueryColumn();
+ tQueryColumn.catalogId = catalogId;
+ tQueryColumn.dbId = dbId;
+ tQueryColumn.tblId = tblId;
+ tQueryColumn.colName = colName;
+ return tQueryColumn;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index c498881bfbf..e0df94b5cb0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -132,6 +132,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
}
+ // TODO: Need refactor, hard to understand now.
protected boolean needAnalyzeColumn(TableIf table, String column) {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
TableStatsMeta tableStatsStatus =
manager.findTableStatsStatus(table.getId());
@@ -151,11 +152,17 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
return true;
}
+ if (lastAnalyzeUpdateRows > currentUpdatedRows) {
+ // Shouldn't happen. Just in case.
+ return true;
+ }
OlapTable olapTable = (OlapTable) table;
+ long currentRowCount = olapTable.getRowCount();
+ long lastAnalyzeRowCount = columnStatsMeta.rowCount;
if (tableStatsStatus.newPartitionLoaded.get() &&
olapTable.isPartitionColumn(column)) {
return true;
}
- if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) {
+ if (lastAnalyzeRowCount == 0 && currentRowCount > 0) {
return true;
}
if (currentUpdatedRows == lastAnalyzeUpdateRows) {
@@ -163,7 +170,17 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
double healthValue = ((double) (currentUpdatedRows -
lastAnalyzeUpdateRows)
/ (double) currentUpdatedRows) * 100.0;
- LOG.info("Column " + column + " health value is " + healthValue);
+ LOG.info("Column " + column + " update rows health value is " +
healthValue);
+ if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) {
+ return true;
+ }
+ if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
+ return true;
+ }
+ if (currentRowCount == 0 && lastAnalyzeRowCount == 0) {
+ return false;
+ }
+ healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) /
(double) currentRowCount) * 100.0;
return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
} else {
if (!(table instanceof HMSExternalTable)) {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 4b67bd1ec9d..1a3b392e61a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1401,6 +1401,18 @@ struct TShowProcessListResult {
1: optional list<list<string>> process_list
}
+struct TQueryColumn {
+ 1: optional i64 catalogId
+ 2: optional i64 dbId
+ 3: optional i64 tblId
+ 4: optional string colName
+}
+
+struct TSyncQueryColumns {
+ 1: optional list<TQueryColumn> highPriorityColumns;
+ 2: optional list<TQueryColumn> midPriorityColumns;
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1485,4 +1497,5 @@ service FrontendService {
Status.TStatus invalidateStatsCache(1:
TInvalidateFollowerStatsCacheRequest request)
TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
+ Status.TStatus syncQueryColumns(1: TSyncQueryColumns request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]