This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dde5ed5231bc9aeb1d889754fdd3732d9017185d
Author: Jack Drogon <[email protected]>
AuthorDate: Tue Jan 23 20:56:39 2024 +0800

    [fix](fe-memory) Fix fe schema change high memory usage (#30231)
    
    Signed-off-by: Jack Drogon <[email protected]>
---
 .../src/main/java/org/apache/doris/alter/RollupJobV2.java |  3 +++
 .../java/org/apache/doris/alter/SchemaChangeJobV2.java    |  7 ++++---
 .../java/org/apache/doris/analysis/DescriptorTable.java   |  7 +++++++
 .../main/java/org/apache/doris/task/AlterReplicaTask.java | 15 +++++++++++----
 4 files changed, 25 insertions(+), 7 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index aa61eaf9062..4ad9e0fcb72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -62,6 +62,7 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterReplicaTask;
 import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -386,6 +387,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         }
 
         tbl.readLock();
+        Map<Object, List<TColumn>> tcloumnsPool  = Maps.newHashMap();
         try {
             Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
             for (Map.Entry<Long, MaterializedIndex> entry : 
this.partitionIdToRollupIndex.entrySet()) {
@@ -465,6 +467,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 partitionId, rollupIndexId, baseIndexId, 
rollupTabletId, baseTabletId,
                                 rollupReplica.getId(), rollupSchemaHash, 
baseSchemaHash, visibleVersion, jobId,
                                 JobType.ROLLUP, defineExprs, descTable, 
tbl.getSchemaByIndexId(baseIndexId, true),
+                                tcloumnsPool,
                                 whereClause);
                         rollupBatchTask.addTask(rollupTask);
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 5c74164ae33..08e27973417 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -55,6 +55,7 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterReplicaTask;
 import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -234,7 +235,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         tbl.readLock();
         try {
-
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
             BinlogConfig binlogConfig = new 
BinlogConfig(tbl.getBinlogConfig());
             for (long partitionId : partitionIndexMap.rowKeySet()) {
@@ -407,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
 
         tbl.readLock();
-
+        Map<Object, List<TColumn>> tcloumnsPool  = Maps.newHashMap();
         try {
             Map<String, Column> indexColumnMap = Maps.newHashMap();
             for (Map.Entry<Long, List<Column>> entry : 
indexSchemaMap.entrySet()) {
@@ -470,7 +470,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                             AlterReplicaTask rollupTask = new 
AlterReplicaTask(shadowReplica.getBackendId(), dbId,
                                     tableId, partitionId, shadowIdxId, 
originIdxId, shadowTabletId, originTabletId,
                                     shadowReplica.getId(), shadowSchemaHash, 
originSchemaHash, visibleVersion, jobId,
-                                    JobType.SCHEMA_CHANGE, defineExprs, 
descTable, originSchemaColumns, null);
+                                    JobType.SCHEMA_CHANGE, defineExprs, 
descTable, originSchemaColumns, tcloumnsPool,
+                                    null);
                             schemaChangeBatchTask.addTask(rollupTask);
                         }
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 42c69bba6e7..fb6cc7df0a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -54,6 +54,8 @@ public class DescriptorTable {
 
     private final HashMap<SlotDescriptor, SlotDescriptor> 
outToIntermediateSlots = new HashMap<>();
 
+    private TDescriptorTable thriftDescTable = null; // serialized version of 
this
+
     public DescriptorTable() {
     }
 
@@ -182,6 +184,10 @@ public class DescriptorTable {
     }
 
     public TDescriptorTable toThrift() {
+        if (thriftDescTable != null) {
+            return thriftDescTable;
+        }
+
         TDescriptorTable result = new TDescriptorTable();
         Map<Long, TableIf> referencedTbls = Maps.newHashMap();
         for (TupleDescriptor tupleD : tupleDescs.values()) {
@@ -208,6 +214,7 @@ public class DescriptorTable {
         for (TableIf tbl : referencedTbls.values()) {
             result.addToTableDescriptors(tbl.toThrift());
         }
+        thriftDescTable = result;
         return result;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
index cd6502da762..267a99a12b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
@@ -53,6 +53,7 @@ public class AlterReplicaTask extends AgentTask {
     private Map<String, Expr> defineExprs;
     private Expr whereClause;
     private DescriptorTable descTable;
+    private Map<Object, List<TColumn>> tcloumnsPool;
     private List<Column> baseSchemaColumns;
 
     /**
@@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask {
     public AlterReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long rollupIndexId,
             long baseIndexId, long rollupTabletId, long baseTabletId, long 
newReplicaId, int newSchemaHash,
             int baseSchemaHash, long version, long jobId, AlterJobV2.JobType 
jobType, Map<String, Expr> defineExprs,
-            DescriptorTable descTable, List<Column> baseSchemaColumns, Expr 
whereClause) {
+            DescriptorTable descTable, List<Column> baseSchemaColumns, 
Map<Object, List<TColumn>> tcloumnsPool,
+            Expr whereClause) {
         super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, 
rollupIndexId, rollupTabletId);
 
         this.baseTabletId = baseTabletId;
@@ -79,6 +81,7 @@ public class AlterReplicaTask extends AgentTask {
         this.whereClause = whereClause;
         this.descTable = descTable;
         this.baseSchemaColumns = baseSchemaColumns;
+        this.tcloumnsPool = tcloumnsPool;
     }
 
     public long getBaseTabletId() {
@@ -140,9 +143,13 @@ public class AlterReplicaTask extends AgentTask {
         req.setDescTbl(descTable.toThrift());
 
         if (baseSchemaColumns != null) {
-            List<TColumn> columns = new ArrayList<TColumn>();
-            for (Column column : baseSchemaColumns) {
-                columns.add(column.toThrift());
+            List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns);
+            if (columns == null) {
+                columns = new ArrayList<TColumn>();
+                for (Column column : baseSchemaColumns) {
+                    columns.add(column.toThrift());
+                }
+                tcloumnsPool.put(baseSchemaColumns, columns);
             }
             req.setColumns(columns);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to