This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9563284bc12 branch-3.1: [improve](statistics)Remove write editlog for
update rows, use update editlog. #53149 (#53839)
9563284bc12 is described below
commit 9563284bc1253688fac4c5bf25179d8fa43e2bcd
Author: James <[email protected]>
AuthorDate: Fri Jul 25 10:21:37 2025 +0800
branch-3.1: [improve](statistics)Remove write editlog for update rows, use
update editlog. #53149 (#53839)
backport: #53149
---
.../apache/doris/alter/SchemaChangeHandler.java | 4 ++
.../org/apache/doris/alter/SchemaChangeJobV2.java | 5 ++-
.../main/java/org/apache/doris/catalog/Env.java | 11 +++++-
.../apache/doris/datasource/InternalCatalog.java | 11 ++++--
.../java/org/apache/doris/persist/EditLog.java | 5 +++
.../apache/doris/persist/TruncateTableInfo.java | 9 ++++-
.../apache/doris/service/FrontendServiceImpl.java | 6 +--
.../apache/doris/statistics/AnalysisManager.java | 45 ++++++++--------------
.../doris/statistics/StatisticsAutoCollector.java | 3 ++
.../doris/statistics/AnalysisManagerTest.java | 2 +-
10 files changed, 60 insertions(+), 41 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index cfe86abf2e4..c40277d1dd6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -3017,6 +3017,7 @@ public class SchemaChangeHandler extends AlterHandler {
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
}
// Drop table column stats after light schema change finished.
+
Env.getCurrentEnv().getAnalysisManager().removeTableStats(olapTable.getId());
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable,
null);
if (isDropIndex) {
@@ -3047,6 +3048,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropColumns(info);
// Drop table column stats after light schema change finished.
+
Env.getCurrentEnv().getAnalysisManager().removeTableStats(olapTable.getId());
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable,
null);
}
LOG.info("finished modify table's add or drop or modify columns.
table: {}, job: {}, is replay: {}",
@@ -3086,6 +3088,7 @@ public class SchemaChangeHandler extends AlterHandler {
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId,
TableType.OLAP);
+ Env.getCurrentEnv().getAnalysisManager().removeTableStats(tableId);
olapTable.writeLock();
try {
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap,
indexes, null, false, jobId, true);
@@ -3236,6 +3239,7 @@ public class SchemaChangeHandler extends AlterHandler {
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId,
TableType.OLAP);
+ Env.getCurrentEnv().getAnalysisManager().removeTableStats(tableId);
olapTable.writeLock();
try {
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap,
newIndexes,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index ad4a6ef60d1..a9065f92389 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -51,6 +51,7 @@ import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
@@ -709,7 +710,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
postProcessOriginIndex();
// Drop table column stats after schema change finished.
- Env.getCurrentEnv().getAnalysisManager().dropStats(tbl, null);
+ AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+ manager.removeTableStats(tbl.getId());
+ manager.dropStats(tbl, null);
}
private void onFinished(OlapTable tbl) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 919bec38443..3f3106266ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5398,7 +5398,9 @@ public class Env {
indexIdToSchemaVersion);
editLog.logColumnRename(info);
LOG.info("rename coloumn[{}] to {}", colName, newColName);
- Env.getCurrentEnv().getAnalysisManager().dropStats(table, null);
+ AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+ manager.removeTableStats(table.getId());
+ manager.dropStats(table, null);
}
}
@@ -5426,6 +5428,7 @@ public class Env {
Database db =
getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
OlapTable table = (OlapTable) db.getTableOrMetaException(tableId,
TableType.OLAP);
+ Env.getCurrentEnv().getAnalysisManager().removeTableStats(tableId);
table.writeLock();
try {
renameColumn(db, table, colName, newColName,
indexIdToSchemaVersion, true);
@@ -6010,6 +6013,12 @@ public class Env {
// In previous versions(before 2.1.8), there is no catalog info in
TruncateTableInfo,
// So if the catalog info is empty, we assume it's internal table.
getInternalCatalog().replayTruncateTable(info);
+ if (info.isEntireTable()) {
+
Env.getCurrentEnv().getAnalysisManager().removeTableStats(info.getTblId());
+ } else {
+
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(info.getUpdateRecords(),
+ info.getDbId(), info.getTblId(), 0);
+ }
} else {
ExternalCatalog ctl = (ExternalCatalog)
catalogMgr.getCatalog(info.getCtl());
if (ctl != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 08d49bf1f90..e3541bae3cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3559,7 +3559,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Map<Long, DistributionInfo> partitionsDistributionInfo =
Maps.newHashMap();
OlapTable copiedTbl;
- boolean truncateEntireTable = partitionNames == null;
+ boolean truncateEntireTable = partitionNames == null ||
partitionNames.isStar();
Database db = (Database) getDbOrDdlException(dbName);
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
@@ -3741,11 +3741,17 @@ public class InternalCatalog implements
CatalogIf<Database> {
// replace
oldPartitions = truncateTableInternal(olapTable, newPartitions,
truncateEntireTable);
+ if (truncateEntireTable) {
+
Env.getCurrentEnv().getAnalysisManager().removeTableStats(olapTable.getId());
+ } else {
+ Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(
+ updateRecords, db.getId(), olapTable.getId(), 0);
+ }
// write edit log
TruncateTableInfo info =
new TruncateTableInfo(db.getId(), db.getFullName(),
olapTable.getId(), olapTable.getName(),
- newPartitions, truncateEntireTable,
rawTruncateSql, oldPartitions);
+ newPartitions, truncateEntireTable,
rawTruncateSql, oldPartitions, updateRecords);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (DdlException e) {
failedCleanCallback.run();
@@ -3759,7 +3765,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
erasePartitionDropBackendReplicas(oldPartitions);
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable,
partitionNames);
-
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords,
db.getId(), olapTable.getId(), 0);
LOG.info("finished to truncate table {}.{}, partitions: {}", dbName,
tableName, partitionNames);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index c372e3b0588..2653bc9dae7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -18,6 +18,7 @@
package org.apache.doris.persist;
import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.alter.AlterJobV2.JobState;
import org.apache.doris.alter.BatchAlterJobPersistInfo;
import org.apache.doris.alter.IndexChangeJob;
import org.apache.doris.analysis.UserIdentity;
@@ -929,6 +930,10 @@ public class EditLog {
break;
case SCHEMA_CHANGE:
env.getSchemaChangeHandler().replayAlterJobV2(alterJob);
+ if
(alterJob.getJobState().equals(JobState.FINISHED)) {
+ AnalysisManager manager =
Env.getCurrentEnv().getAnalysisManager();
+
manager.removeTableStats(alterJob.getTableId());
+ }
break;
default:
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
index be7df995919..702c754af8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
@@ -54,6 +54,8 @@ public class TruncateTableInfo implements Writable {
private String rawSql = "";
@SerializedName(value = "op")
private Map<Long, String> oldPartitions = new HashMap<>();
+ @SerializedName(value = "ur")
+ private Map<Long, Long> updateRecords;
public TruncateTableInfo() {
@@ -61,7 +63,7 @@ public class TruncateTableInfo implements Writable {
// for internal table
public TruncateTableInfo(long dbId, String db, long tblId, String table,
List<Partition> partitions,
- boolean isEntireTable, String rawSql, List<Partition>
oldPartitions) {
+ boolean isEntireTable, String rawSql, List<Partition>
oldPartitions, Map<Long, Long> updateRecords) {
this.dbId = dbId;
this.db = db;
this.tblId = tblId;
@@ -72,6 +74,7 @@ public class TruncateTableInfo implements Writable {
for (Partition partition : oldPartitions) {
this.oldPartitions.put(partition.getId(), partition.getName());
}
+ this.updateRecords = updateRecords;
}
// for external table
@@ -122,6 +125,10 @@ public class TruncateTableInfo implements Writable {
return rawSql;
}
+ public Map<Long, Long> getUpdateRecords() {
+ return updateRecords;
+ }
+
public static TruncateTableInfo read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, TruncateTableInfo.class);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9a034952ca1..0b0ea7e8ff3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -121,7 +121,6 @@ import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdatePartitionStatsTarget;
import org.apache.doris.statistics.query.QueryStats;
-import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
@@ -3616,9 +3615,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
partitionNames = new PartitionNames(false, new
ArrayList<>(target.partitions));
}
if (target.isTruncate) {
- TableIf table = StatisticsUtil.findTable(target.catalogId,
target.dbId, target.tableId);
- analysisManager.submitAsyncDropStatsTask(table, target.catalogId,
target.dbId,
- target.tableId, tableStats, partitionNames, false);
+ analysisManager.submitAsyncDropStatsTask(target.catalogId,
target.dbId,
+ target.tableId, partitionNames, false);
} else {
analysisManager.invalidateLocalStats(target.catalogId,
target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 278d53de702..70d958ef247 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -714,14 +714,10 @@ public class AnalysisManager implements Writable {
public void dropStats(TableIf table, PartitionNames partitionNames) {
try {
- TableStatsMeta tableStats = findTableStatsStatus(table.getId());
- if (tableStats == null) {
- return;
- }
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
- submitAsyncDropStatsTask(table, catalogId, dbId, tableId,
tableStats, partitionNames, true);
+ submitAsyncDropStatsTask(catalogId, dbId, tableId, partitionNames,
true);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
@@ -734,10 +730,9 @@ public class AnalysisManager implements Writable {
private final Set<String> columns;
private final TableStatsMeta tableStats;
private final PartitionNames partitionNames;
- private final TableIf table;
private final boolean isMaster;
- public DropStatsTask(TableIf table, long catalogId, long dbId, long
tableId, Set<String> columns,
+ public DropStatsTask(long catalogId, long dbId, long tableId,
Set<String> columns,
TableStatsMeta tableStats, PartitionNames
partitionNames, boolean isMaster) {
this.catalogId = catalogId;
this.dbId = dbId;
@@ -745,7 +740,6 @@ public class AnalysisManager implements Writable {
this.columns = columns;
this.tableStats = tableStats;
this.partitionNames = partitionNames;
- this.table = table;
this.isMaster = isMaster;
}
@@ -753,11 +747,6 @@ public class AnalysisManager implements Writable {
public void run() {
try {
if (isMaster) {
- if (!table.isPartitionedTable() || partitionNames == null
- || partitionNames.isStar() ||
partitionNames.getPartitionNames() == null) {
- removeTableStats(tableId);
-
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new
TableStatsDeletionLog(tableId));
- }
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar()
@@ -776,11 +765,11 @@ public class AnalysisManager implements Writable {
}
}
- public void submitAsyncDropStatsTask(TableIf table, long catalogId, long
dbId, long tableId,
- TableStatsMeta tableStats,
PartitionNames partitionNames, boolean isMaster) {
+ public void submitAsyncDropStatsTask(long catalogId, long dbId, long
tableId,
+ PartitionNames partitionNames, boolean isMaster) {
try {
- dropStatsExecutors.submit(new DropStatsTask(table, catalogId,
dbId, tableId, null,
- tableStats, partitionNames, isMaster));
+ dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId,
tableId, null,
+ findTableStatsStatus(tableId), partitionNames, isMaster));
} catch (Throwable t) {
LOG.info("Failed to submit async drop stats job. reason: {}",
t.getMessage());
}
@@ -888,9 +877,11 @@ public class AnalysisManager implements Writable {
}
statisticsCache.invalidateStats(frontend, request);
}
- TableStatsMeta tableStats = findTableStatsStatus(tableId);
- if (tableStats != null) {
- logCreateTableStats(tableStats);
+ if (!isTruncate) {
+ TableStatsMeta tableStats = findTableStatsStatus(tableId);
+ if (tableStats != null) {
+ logCreateTableStats(tableStats);
+ }
}
}
@@ -1251,13 +1242,9 @@ public class AnalysisManager implements Writable {
// Invoke this when load transaction finished.
public void updateUpdatedRows(Map<Long, Map<Long, Long>> tabletRecords,
long dbId, long txnId) {
try {
- if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
- return;
- }
UpdateRowsEvent updateRowsEvent = new
UpdateRowsEvent(tabletRecords, dbId);
LOG.info("Update rows transactionId is {}", txnId);
replayUpdateRowsRecord(updateRowsEvent);
- logUpdateRowsRecord(updateRowsEvent);
} catch (Throwable t) {
LOG.warn("Failed to record update rows.", t);
}
@@ -1266,12 +1253,8 @@ public class AnalysisManager implements Writable {
// Invoke this when load truncate table finished.
public void updateUpdatedRows(Map<Long, Long> partitionToUpdateRows, long
dbId, long tableId, long txnId) {
try {
- if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
- return;
- }
UpdateRowsEvent updateRowsEvent = new
UpdateRowsEvent(partitionToUpdateRows, dbId, tableId);
replayUpdateRowsRecord(updateRowsEvent);
- logUpdateRowsRecord(updateRowsEvent);
} catch (Throwable t) {
LOG.warn("Failed to record update rows.", t);
}
@@ -1293,7 +1276,7 @@ public class AnalysisManager implements Writable {
// Set to true means new partition loaded data
public void setNewPartitionLoaded(List<Long> tableIds) {
- if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() ||
tableIds == null || tableIds.isEmpty()) {
+ if (tableIds == null || tableIds.isEmpty()) {
return;
}
for (long tableId : tableIds) {
@@ -1302,7 +1285,9 @@ public class AnalysisManager implements Writable {
statsStatus.partitionChanged.set(true);
}
}
- logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
+ if (Config.isCloudMode() && Env.getCurrentEnv().isMaster() &&
!Env.isCheckpointThread()) {
+ logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
+ }
}
public void updateTableStatsStatus(TableStatsMeta tableStats) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 2f287cca035..512e807c620 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.persist.TableStatsDeletionLog;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -215,6 +216,8 @@ public class StatisticsAutoCollector extends MasterDaemon {
LOG.info("Table {} is empty, remove its old stats and skip auto
analyze it.", table.getName());
// Remove the table's old stats if exists.
if (tableStatsStatus != null &&
!tableStatsStatus.isColumnsStatsEmpty()) {
+ manager.removeTableStats(table.getId());
+ Env.getCurrentEnv().getEditLog().logDeleteTableStats(new
TableStatsDeletionLog(table.getId()));
manager.dropStats(table, null);
}
return null;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index ae9262c4ec7..ee5099dea95 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -658,7 +658,7 @@ public class AnalysisManagerTest {
AnalysisManager analysisManager = new AnalysisManager();
for (int i = 0; i < 20; i++) {
System.out.println("Submit " + i);
- analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null,
null, false);
+ analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, false);
}
Thread.sleep(10000);
System.out.println(count.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]