This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dd27334bfa [fix](schema change) reduce memory usage of create a good 
deal of shadow tablets in schema change process (#36285)
3dd27334bfa is described below

commit 3dd27334bfa24338a6973a368183847c8d74a298
Author: Lightman <[email protected]>
AuthorDate: Sat Jun 15 23:28:12 2024 +0800

    [fix](schema change) reduce memory usage of create a good deal of shadow 
tablets in schema change process (#36285)
    
    When doing schema change, it will create many shadow tablets. And many
    TCreateTabletReq will be new. So we can use object pool to reduct they
    memory usage.
---
 .../java/org/apache/doris/alter/RollupJobV2.java   |  4 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  4 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |  4 +-
 .../apache/doris/datasource/InternalCatalog.java   |  3 +-
 .../org/apache/doris/master/ReportHandler.java     |  3 +-
 .../org/apache/doris/task/CreateReplicaTask.java   | 56 +++++++++++++++-------
 .../java/org/apache/doris/task/AgentTaskTest.java  |  4 +-
 7 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 93aceb0936b..66912307ed1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -80,6 +80,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -218,6 +219,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         try {
             BinlogConfig binlogConfig = new 
BinlogConfig(tbl.getBinlogConfig());
             Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+            Map<Object, Object> objectPool = new HashMap<Object, Object>();
             for (Map.Entry<Long, MaterializedIndex> entry : 
this.partitionIdToRollupIndex.entrySet()) {
                 long partitionId = entry.getKey();
                 Partition partition = tbl.getPartition(partitionId);
@@ -261,7 +263,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                                 tbl.getTimeSeriesCompactionLevelThreshold(),
                                 tbl.storeRowColumn(),
-                                binlogConfig);
+                                binlogConfig, objectPool);
                         
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), 
baseSchemaHash);
                         if (this.storageFormat != null) {
                             
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 57ae326374a..f58ca8de261 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -72,6 +72,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -226,6 +227,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         try {
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
             BinlogConfig binlogConfig = new 
BinlogConfig(tbl.getBinlogConfig());
+            Map<Object, Object> objectPool = new HashMap<Object, Object>();
             for (long partitionId : partitionIndexMap.rowKeySet()) {
                 Partition partition = tbl.getPartition(partitionId);
                 if (partition == null) {
@@ -275,7 +277,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                                     
tbl.getTimeSeriesCompactionLevelThreshold(),
                                     tbl.storeRowColumn(),
-                                    binlogConfig);
+                                    binlogConfig, objectPool);
 
                             
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, 
shadowIdxId)
                                     .get(shadowTabletId), originSchemaHash);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 53f1da8582b..6928f8ef2f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -100,6 +100,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1087,6 +1088,7 @@ public class RestoreJob extends AbstractJob {
         } finally {
             localTbl.readUnlock();
         }
+        Map<Object, Object> objectPool = new HashMap<Object, Object>();
         for (MaterializedIndex restoredIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
             List<Index> indexes = restoredIdx.getId() == 
localTbl.getBaseIndexId()
@@ -1120,7 +1122,7 @@ public class RestoreJob extends AbstractJob {
                             
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                             localTbl.getTimeSeriesCompactionLevelThreshold(),
                             localTbl.storeRowColumn(),
-                            binlogConfig);
+                            binlogConfig, objectPool);
                     
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
                     task.setInRestoreMode(true);
                     batchTask.addTask(task);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 5a2a8c48793..ad1bbe5f72d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1998,6 +1998,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
         short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
         TStorageMedium realStorageMedium = null;
+        Map<Object, Object> objectPool = new HashMap<Object, Object>();
         for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
             long indexId = entry.getKey();
             MaterializedIndex index = entry.getValue();
@@ -2046,7 +2047,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                             tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                             tbl.getTimeSeriesCompactionLevelThreshold(),
-                            tbl.storeRowColumn(), binlogConfig);
+                            tbl.storeRowColumn(), binlogConfig, objectPool);
 
                     task.setStorageFormat(tbl.getStorageFormat());
                     
task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 27213652d46..e74467ea6db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -775,6 +775,7 @@ public class ReportHandler extends Daemon {
                                        long backendReportVersion) {
         AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+        Map<Object, Object> objectPool = new HashMap<Object, Object>();
         for (Long dbId : tabletDeleteFromMeta.keySet()) {
             Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
             if (db == null) {
@@ -881,7 +882,7 @@ public class ReportHandler extends Daemon {
                                             
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                                             
olapTable.getTimeSeriesCompactionLevelThreshold(),
                                             olapTable.storeRowColumn(),
-                                            binlogConfig);
+                                            binlogConfig, objectPool);
 
                                     createReplicaTask.setIsRecoverTask(true);
                                     
createReplicaTask.setInvertedIndexStorageFormat(olapTable
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 7a5262ba0ed..1de5d4e8d7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -123,6 +124,8 @@ public class CreateReplicaTask extends AgentTask {
     private BinlogConfig binlogConfig;
     private List<Integer> clusterKeyIndexes;
 
+    private Map<Object, Object> objectPool;
+
     public CreateReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
                              long replicaId, short shortKeyColumnCount, int 
schemaHash, long version,
                              KeysType keysType, TStorageType storageType,
@@ -144,7 +147,8 @@ public class CreateReplicaTask extends AgentTask {
                              long timeSeriesCompactionEmptyRowsetsThreshold,
                              long timeSeriesCompactionLevelThreshold,
                              boolean storeRowColumn,
-                             BinlogConfig binlogConfig) {
+                             BinlogConfig binlogConfig,
+                             Map<Object, Object> objectPool) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
 
         this.replicaId = replicaId;
@@ -188,6 +192,7 @@ public class CreateReplicaTask extends AgentTask {
         this.timeSeriesCompactionLevelThreshold = 
timeSeriesCompactionLevelThreshold;
         this.storeRowColumn = storeRowColumn;
         this.binlogConfig = binlogConfig;
+        this.objectPool = objectPool;
     }
 
     public void setIsRecoverTask(boolean isRecoverTask) {
@@ -260,21 +265,32 @@ public class CreateReplicaTask extends AgentTask {
         int deleteSign = -1;
         int sequenceCol = -1;
         int versionCol = -1;
-        List<TColumn> tColumns = new ArrayList<TColumn>();
+        List<TColumn> tColumns = null;
+        Object tCols = objectPool.get(columns);
+        if (tCols != null) {
+            tColumns = (List<TColumn>) tCols;
+        } else {
+            tColumns = new ArrayList<>();
+            for (int i = 0; i < columns.size(); i++) {
+                Column column = columns.get(i);
+                TColumn tColumn = column.toThrift();
+                // is bloom filter column
+                if (bfColumns != null && bfColumns.contains(column.getName())) 
{
+                    tColumn.setIsBloomFilterColumn(true);
+                }
+                // when doing schema change, some modified column has a prefix 
in name.
+                // this prefix is only used in FE, not visible to BE, so we 
should remove this prefix.
+                if 
(column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
+                    tColumn.setColumnName(
+                            
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
+                }
+                tColumn.setVisible(column.isVisible());
+                tColumns.add(tColumn);
+            }
+            objectPool.put(columns, tColumns);
+        }
         for (int i = 0; i < columns.size(); i++) {
             Column column = columns.get(i);
-            TColumn tColumn = column.toThrift();
-            // is bloom filter column
-            if (bfColumns != null && bfColumns.contains(column.getName())) {
-                tColumn.setIsBloomFilterColumn(true);
-            }
-            // when doing schema change, some modified column has a prefix in 
name.
-            // this prefix is only used in FE, not visible to BE, so we should 
remove this prefix.
-            if 
(column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
-                
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
-            }
-            tColumn.setVisible(column.isVisible());
-            tColumns.add(tColumn);
             if (column.isDeleteSignColumn()) {
                 deleteSign = i;
             }
@@ -296,9 +312,15 @@ public class CreateReplicaTask extends AgentTask {
             }
         }
         if (CollectionUtils.isNotEmpty(indexes)) {
-            List<TOlapTableIndex> tIndexes = new ArrayList<>();
-            for (Index index : indexes) {
-                tIndexes.add(index.toThrift());
+            List<TOlapTableIndex> tIndexes = null;
+            Object value = objectPool.get(indexes);
+            if (value != null) {
+                tIndexes = (List<TOlapTableIndex>) value;
+            } else {
+                tIndexes = new ArrayList<>();
+                for (Index index : indexes) {
+                    tIndexes.add(index.toThrift());
+                }
             }
             tSchema.setIndexes(tIndexes);
             storageFormat = TStorageFormat.V2;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 3dc4bc4695c..b604076ddba 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -102,12 +102,12 @@ public class AgentTaskTest {
         range2 = Range.closedOpen(pk2, pk3);
 
         // create tasks
-
+        Map<Object, Object> objectPool = new HashMap<Object, Object>();
         // create
         createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, 
partitionId,
                 indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, 
version, KeysType.AGG_KEYS, storageType,
                 TStorageMedium.SSD, columns, null, 0, latch, null, false, 
TTabletType.TABLET_TYPE_DISK, null,
-                TCompressionType.LZ4F, false, "", false, false, false, "", 0, 
0, 0, 0, 0, false, null);
+                TCompressionType.LZ4F, false, "", false, false, false, "", 0, 
0, 0, 0, 0, false, null, objectPool);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, 
schemaHash1, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to