This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9563284bc12 branch-3.1: [improve](statistics)Remove write editlog for 
update rows, use update editlog. #53149 (#53839)
9563284bc12 is described below

commit 9563284bc1253688fac4c5bf25179d8fa43e2bcd
Author: James <[email protected]>
AuthorDate: Fri Jul 25 10:21:37 2025 +0800

    branch-3.1: [improve](statistics)Remove write editlog for update rows, use 
update editlog. #53149 (#53839)
    
    backport: #53149
---
 .../apache/doris/alter/SchemaChangeHandler.java    |  4 ++
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  5 ++-
 .../main/java/org/apache/doris/catalog/Env.java    | 11 +++++-
 .../apache/doris/datasource/InternalCatalog.java   | 11 ++++--
 .../java/org/apache/doris/persist/EditLog.java     |  5 +++
 .../apache/doris/persist/TruncateTableInfo.java    |  9 ++++-
 .../apache/doris/service/FrontendServiceImpl.java  |  6 +--
 .../apache/doris/statistics/AnalysisManager.java   | 45 ++++++++--------------
 .../doris/statistics/StatisticsAutoCollector.java  |  3 ++
 .../doris/statistics/AnalysisManagerTest.java      |  2 +-
 10 files changed, 60 insertions(+), 41 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index cfe86abf2e4..c40277d1dd6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -3017,6 +3017,7 @@ public class SchemaChangeHandler extends AlterHandler {
                     
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
                 }
                 // Drop table column stats after light schema change finished.
+                
Env.getCurrentEnv().getAnalysisManager().removeTableStats(olapTable.getId());
                 Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, 
null);
 
                 if (isDropIndex) {
@@ -3047,6 +3048,7 @@ public class SchemaChangeHandler extends AlterHandler {
                 }
                 
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropColumns(info);
                 // Drop table column stats after light schema change finished.
+                
Env.getCurrentEnv().getAnalysisManager().removeTableStats(olapTable.getId());
                 Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, 
null);
             }
             LOG.info("finished modify table's add or drop or modify columns. 
table: {}, job: {}, is replay: {}",
@@ -3086,6 +3088,7 @@ public class SchemaChangeHandler extends AlterHandler {
 
         Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
         OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, 
TableType.OLAP);
+        Env.getCurrentEnv().getAnalysisManager().removeTableStats(tableId);
         olapTable.writeLock();
         try {
             modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, 
indexes, null, false, jobId, true);
@@ -3236,6 +3239,7 @@ public class SchemaChangeHandler extends AlterHandler {
 
         Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
         OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, 
TableType.OLAP);
+        Env.getCurrentEnv().getAnalysisManager().removeTableStats(tableId);
         olapTable.writeLock();
         try {
             modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, 
newIndexes,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index ad4a6ef60d1..a9065f92389 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -51,6 +51,7 @@ import org.apache.doris.common.util.DbUtil;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskExecutor;
@@ -709,7 +710,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
         postProcessOriginIndex();
         // Drop table column stats after schema change finished.
-        Env.getCurrentEnv().getAnalysisManager().dropStats(tbl, null);
+        AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+        manager.removeTableStats(tbl.getId());
+        manager.dropStats(tbl, null);
     }
 
     private void onFinished(OlapTable tbl) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 919bec38443..3f3106266ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5398,7 +5398,9 @@ public class Env {
                     indexIdToSchemaVersion);
             editLog.logColumnRename(info);
             LOG.info("rename coloumn[{}] to {}", colName, newColName);
-            Env.getCurrentEnv().getAnalysisManager().dropStats(table, null);
+            AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+            manager.removeTableStats(table.getId());
+            manager.dropStats(table, null);
         }
     }
 
@@ -5426,6 +5428,7 @@ public class Env {
 
         Database db = 
getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
         OlapTable table = (OlapTable) db.getTableOrMetaException(tableId, 
TableType.OLAP);
+        Env.getCurrentEnv().getAnalysisManager().removeTableStats(tableId);
         table.writeLock();
         try {
             renameColumn(db, table, colName, newColName, 
indexIdToSchemaVersion, true);
@@ -6010,6 +6013,12 @@ public class Env {
             // In previous versions(before 2.1.8), there is no catalog info in 
TruncateTableInfo,
             // So if the catalog info is empty, we assume it's internal table.
             getInternalCatalog().replayTruncateTable(info);
+            if (info.isEntireTable()) {
+                
Env.getCurrentEnv().getAnalysisManager().removeTableStats(info.getTblId());
+            } else {
+                
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(info.getUpdateRecords(),
+                        info.getDbId(), info.getTblId(), 0);
+            }
         } else {
             ExternalCatalog ctl = (ExternalCatalog) 
catalogMgr.getCatalog(info.getCtl());
             if (ctl != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 08d49bf1f90..e3541bae3cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3559,7 +3559,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         Map<Long, DistributionInfo> partitionsDistributionInfo = 
Maps.newHashMap();
         OlapTable copiedTbl;
 
-        boolean truncateEntireTable = partitionNames == null;
+        boolean truncateEntireTable = partitionNames == null || 
partitionNames.isStar();
 
         Database db = (Database) getDbOrDdlException(dbName);
         OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
@@ -3741,11 +3741,17 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
             // replace
             oldPartitions = truncateTableInternal(olapTable, newPartitions, 
truncateEntireTable);
+            if (truncateEntireTable) {
+                
Env.getCurrentEnv().getAnalysisManager().removeTableStats(olapTable.getId());
+            } else {
+                Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(
+                        updateRecords, db.getId(), olapTable.getId(), 0);
+            }
 
             // write edit log
             TruncateTableInfo info =
                     new TruncateTableInfo(db.getId(), db.getFullName(), 
olapTable.getId(), olapTable.getName(),
-                            newPartitions, truncateEntireTable, 
rawTruncateSql, oldPartitions);
+                            newPartitions, truncateEntireTable, 
rawTruncateSql, oldPartitions, updateRecords);
             Env.getCurrentEnv().getEditLog().logTruncateTable(info);
         } catch (DdlException e) {
             failedCleanCallback.run();
@@ -3759,7 +3765,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         erasePartitionDropBackendReplicas(oldPartitions);
 
         Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, 
partitionNames);
-        
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords, 
db.getId(), olapTable.getId(), 0);
         LOG.info("finished to truncate table {}.{}, partitions: {}", dbName, 
tableName, partitionNames);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index c372e3b0588..2653bc9dae7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -18,6 +18,7 @@
 package org.apache.doris.persist;
 
 import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.alter.AlterJobV2.JobState;
 import org.apache.doris.alter.BatchAlterJobPersistInfo;
 import org.apache.doris.alter.IndexChangeJob;
 import org.apache.doris.analysis.UserIdentity;
@@ -929,6 +930,10 @@ public class EditLog {
                             break;
                         case SCHEMA_CHANGE:
                             
env.getSchemaChangeHandler().replayAlterJobV2(alterJob);
+                            if 
(alterJob.getJobState().equals(JobState.FINISHED)) {
+                                AnalysisManager manager = 
Env.getCurrentEnv().getAnalysisManager();
+                                
manager.removeTableStats(alterJob.getTableId());
+                            }
                             break;
                         default:
                             break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
index be7df995919..702c754af8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
@@ -54,6 +54,8 @@ public class TruncateTableInfo implements Writable {
     private String rawSql = "";
     @SerializedName(value = "op")
     private Map<Long, String> oldPartitions = new HashMap<>();
+    @SerializedName(value = "ur")
+    private Map<Long, Long> updateRecords;
 
     public TruncateTableInfo() {
 
@@ -61,7 +63,7 @@ public class TruncateTableInfo implements Writable {
 
     // for internal table
     public TruncateTableInfo(long dbId, String db, long tblId, String table, 
List<Partition> partitions,
-            boolean isEntireTable, String rawSql, List<Partition> 
oldPartitions) {
+            boolean isEntireTable, String rawSql, List<Partition> 
oldPartitions, Map<Long, Long> updateRecords) {
         this.dbId = dbId;
         this.db = db;
         this.tblId = tblId;
@@ -72,6 +74,7 @@ public class TruncateTableInfo implements Writable {
         for (Partition partition : oldPartitions) {
             this.oldPartitions.put(partition.getId(), partition.getName());
         }
+        this.updateRecords = updateRecords;
     }
 
     // for external table
@@ -122,6 +125,10 @@ public class TruncateTableInfo implements Writable {
         return rawSql;
     }
 
+    public Map<Long, Long> getUpdateRecords() {
+        return updateRecords;
+    }
+
     public static TruncateTableInfo read(DataInput in) throws IOException {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, TruncateTableInfo.class);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9a034952ca1..0b0ea7e8ff3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -121,7 +121,6 @@ import org.apache.doris.statistics.StatisticsCacheKey;
 import org.apache.doris.statistics.TableStatsMeta;
 import org.apache.doris.statistics.UpdatePartitionStatsTarget;
 import org.apache.doris.statistics.query.QueryStats;
-import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
@@ -3616,9 +3615,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             partitionNames = new PartitionNames(false, new 
ArrayList<>(target.partitions));
         }
         if (target.isTruncate) {
-            TableIf table = StatisticsUtil.findTable(target.catalogId, 
target.dbId, target.tableId);
-            analysisManager.submitAsyncDropStatsTask(table, target.catalogId, 
target.dbId,
-                    target.tableId, tableStats, partitionNames, false);
+            analysisManager.submitAsyncDropStatsTask(target.catalogId, 
target.dbId,
+                    target.tableId, partitionNames, false);
         } else {
             analysisManager.invalidateLocalStats(target.catalogId, 
target.dbId, target.tableId,
                     target.columns, tableStats, partitionNames);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 278d53de702..70d958ef247 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -714,14 +714,10 @@ public class AnalysisManager implements Writable {
 
     public void dropStats(TableIf table, PartitionNames partitionNames) {
         try {
-            TableStatsMeta tableStats = findTableStatsStatus(table.getId());
-            if (tableStats == null) {
-                return;
-            }
             long catalogId = table.getDatabase().getCatalog().getId();
             long dbId = table.getDatabase().getId();
             long tableId = table.getId();
-            submitAsyncDropStatsTask(table, catalogId, dbId, tableId, 
tableStats, partitionNames, true);
+            submitAsyncDropStatsTask(catalogId, dbId, tableId, partitionNames, 
true);
         } catch (Throwable e) {
             LOG.warn("Failed to drop stats for table {}", table.getName(), e);
         }
@@ -734,10 +730,9 @@ public class AnalysisManager implements Writable {
         private final Set<String> columns;
         private final TableStatsMeta tableStats;
         private final PartitionNames partitionNames;
-        private final TableIf table;
         private final boolean isMaster;
 
-        public DropStatsTask(TableIf table, long catalogId, long dbId, long 
tableId, Set<String> columns,
+        public DropStatsTask(long catalogId, long dbId, long tableId, 
Set<String> columns,
                              TableStatsMeta tableStats, PartitionNames 
partitionNames, boolean isMaster) {
             this.catalogId = catalogId;
             this.dbId = dbId;
@@ -745,7 +740,6 @@ public class AnalysisManager implements Writable {
             this.columns = columns;
             this.tableStats = tableStats;
             this.partitionNames = partitionNames;
-            this.table = table;
             this.isMaster = isMaster;
         }
 
@@ -753,11 +747,6 @@ public class AnalysisManager implements Writable {
         public void run() {
             try {
                 if (isMaster) {
-                    if (!table.isPartitionedTable() || partitionNames == null
-                            || partitionNames.isStar() || 
partitionNames.getPartitionNames() == null) {
-                        removeTableStats(tableId);
-                        
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new 
TableStatsDeletionLog(tableId));
-                    }
                     // Drop stats ddl is master only operation.
                     Set<String> partitions = null;
                     if (partitionNames != null && !partitionNames.isStar()
@@ -776,11 +765,11 @@ public class AnalysisManager implements Writable {
         }
     }
 
-    public void submitAsyncDropStatsTask(TableIf table, long catalogId, long 
dbId, long tableId,
-                                         TableStatsMeta tableStats, 
PartitionNames partitionNames, boolean isMaster) {
+    public void submitAsyncDropStatsTask(long catalogId, long dbId, long 
tableId,
+            PartitionNames partitionNames, boolean isMaster) {
         try {
-            dropStatsExecutors.submit(new DropStatsTask(table, catalogId, 
dbId, tableId, null,
-                    tableStats, partitionNames, isMaster));
+            dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, 
tableId, null,
+                    findTableStatsStatus(tableId), partitionNames, isMaster));
         } catch (Throwable t) {
             LOG.info("Failed to submit async drop stats job. reason: {}", 
t.getMessage());
         }
@@ -888,9 +877,11 @@ public class AnalysisManager implements Writable {
             }
             statisticsCache.invalidateStats(frontend, request);
         }
-        TableStatsMeta tableStats = findTableStatsStatus(tableId);
-        if (tableStats != null) {
-            logCreateTableStats(tableStats);
+        if (!isTruncate) {
+            TableStatsMeta tableStats = findTableStatsStatus(tableId);
+            if (tableStats != null) {
+                logCreateTableStats(tableStats);
+            }
         }
     }
 
@@ -1251,13 +1242,9 @@ public class AnalysisManager implements Writable {
     // Invoke this when load transaction finished.
     public void updateUpdatedRows(Map<Long, Map<Long, Long>> tabletRecords, 
long dbId, long txnId) {
         try {
-            if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
-                return;
-            }
             UpdateRowsEvent updateRowsEvent = new 
UpdateRowsEvent(tabletRecords, dbId);
             LOG.info("Update rows transactionId is {}", txnId);
             replayUpdateRowsRecord(updateRowsEvent);
-            logUpdateRowsRecord(updateRowsEvent);
         } catch (Throwable t) {
             LOG.warn("Failed to record update rows.", t);
         }
@@ -1266,12 +1253,8 @@ public class AnalysisManager implements Writable {
     // Invoke this when load truncate table finished.
     public void updateUpdatedRows(Map<Long, Long> partitionToUpdateRows, long 
dbId, long tableId, long txnId) {
         try {
-            if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
-                return;
-            }
             UpdateRowsEvent updateRowsEvent = new 
UpdateRowsEvent(partitionToUpdateRows, dbId, tableId);
             replayUpdateRowsRecord(updateRowsEvent);
-            logUpdateRowsRecord(updateRowsEvent);
         } catch (Throwable t) {
             LOG.warn("Failed to record update rows.", t);
         }
@@ -1293,7 +1276,7 @@ public class AnalysisManager implements Writable {
 
     // Set to true means new partition loaded data
     public void setNewPartitionLoaded(List<Long> tableIds) {
-        if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || 
tableIds == null || tableIds.isEmpty()) {
+        if (tableIds == null || tableIds.isEmpty()) {
             return;
         }
         for (long tableId : tableIds) {
@@ -1302,7 +1285,9 @@ public class AnalysisManager implements Writable {
                 statsStatus.partitionChanged.set(true);
             }
         }
-        logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
+        if (Config.isCloudMode() && Env.getCurrentEnv().isMaster() && 
!Env.isCheckpointThread()) {
+            logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
+        }
     }
 
     public void updateTableStatsStatus(TableStatsMeta tableStats) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 2f287cca035..512e807c620 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.persist.TableStatsDeletionLog;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -215,6 +216,8 @@ public class StatisticsAutoCollector extends MasterDaemon {
             LOG.info("Table {} is empty, remove its old stats and skip auto 
analyze it.", table.getName());
             // Remove the table's old stats if exists.
             if (tableStatsStatus != null && 
!tableStatsStatus.isColumnsStatsEmpty()) {
+                manager.removeTableStats(table.getId());
+                Env.getCurrentEnv().getEditLog().logDeleteTableStats(new 
TableStatsDeletionLog(table.getId()));
                 manager.dropStats(table, null);
             }
             return null;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index ae9262c4ec7..ee5099dea95 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -658,7 +658,7 @@ public class AnalysisManagerTest {
         AnalysisManager analysisManager = new AnalysisManager();
         for (int i = 0; i < 20; i++) {
             System.out.println("Submit " + i);
-            analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null, 
null, false);
+            analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, false);
         }
         Thread.sleep(10000);
         System.out.println(count.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to