This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit dde5ed5231bc9aeb1d889754fdd3732d9017185d Author: Jack Drogon <[email protected]> AuthorDate: Tue Jan 23 20:56:39 2024 +0800 [fix](fe-memory) Fix fe schema change high memory usage (#30231) Signed-off-by: Jack Drogon <[email protected]> --- .../src/main/java/org/apache/doris/alter/RollupJobV2.java | 3 +++ .../java/org/apache/doris/alter/SchemaChangeJobV2.java | 7 ++++--- .../java/org/apache/doris/analysis/DescriptorTable.java | 7 +++++++ .../main/java/org/apache/doris/task/AlterReplicaTask.java | 15 +++++++++++---- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index aa61eaf9062..4ad9e0fcb72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -62,6 +62,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -386,6 +387,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } tbl.readLock(); + Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { @@ -465,6 +467,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), + tcloumnsPool, whereClause); rollupBatchTask.addTask(rollupTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 5c74164ae33..08e27973417 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -55,6 +55,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -234,7 +235,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.readLock(); try { - Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); for (long partitionId : partitionIndexMap.rowKeySet()) { @@ -407,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } tbl.readLock(); - + Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap(); try { Map<String, Column> indexColumnMap = Maps.newHashMap(); for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) { @@ -470,7 +470,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null); + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool, + null); schemaChangeBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 42c69bba6e7..fb6cc7df0a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -54,6 +54,8 @@ public class DescriptorTable { private final HashMap<SlotDescriptor, SlotDescriptor> outToIntermediateSlots = new HashMap<>(); + private TDescriptorTable thriftDescTable = null; // serialized version of this + public DescriptorTable() { } @@ -182,6 +184,10 @@ public class DescriptorTable { } public TDescriptorTable toThrift() { + if (thriftDescTable != null) { + return thriftDescTable; + } + TDescriptorTable result = new TDescriptorTable(); Map<Long, TableIf> referencedTbls = Maps.newHashMap(); for (TupleDescriptor tupleD : tupleDescs.values()) { @@ -208,6 +214,7 @@ public class DescriptorTable { for (TableIf tbl : referencedTbls.values()) { result.addToTableDescriptors(tbl.toThrift()); } + thriftDescTable = result; return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index cd6502da762..267a99a12b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -53,6 +53,7 @@ public class AlterReplicaTask extends AgentTask { private Map<String, Expr> defineExprs; private Expr whereClause; private DescriptorTable descTable; + private Map<Object, List<TColumn>> tcloumnsPool; private List<Column> baseSchemaColumns; /** @@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs, - DescriptorTable descTable, List<Column> baseSchemaColumns, Expr whereClause) { + DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, List<TColumn>> tcloumnsPool, + Expr whereClause) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); this.baseTabletId = baseTabletId; @@ -79,6 +81,7 @@ public class AlterReplicaTask extends AgentTask { this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; + this.tcloumnsPool = tcloumnsPool; } public long getBaseTabletId() { @@ -140,9 +143,13 @@ public class AlterReplicaTask extends AgentTask { req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List<TColumn> columns = new ArrayList<TColumn>(); - for (Column column : baseSchemaColumns) { - columns.add(column.toThrift()); + List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns); + if (columns == null) { + columns = new ArrayList<TColumn>(); + for (Column column : baseSchemaColumns) { + columns.add(column.toThrift()); + } + tcloumnsPool.put(baseSchemaColumns, columns); } req.setColumns(columns); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
