This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 39a86005e64 [fix](schema change) reduce memory usage in schema change
process (#36751)
39a86005e64 is described below
commit 39a86005e6477be16c1bb44241d34eb3bdbe4749
Author: Lightman <[email protected]>
AuthorDate: Tue Jun 25 11:41:01 2024 +0800
[fix](schema change) reduce memory usage in schema change process (#36751)
pick
https://github.com/apache/doris/pull/36285
https://github.com/apache/doris/pull/30231
https://github.com/apache/doris/pull/33073
---
.../java/org/apache/doris/alter/RollupJobV2.java | 8 +++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 10 ++--
.../org/apache/doris/analysis/DescriptorTable.java | 6 +++
.../java/org/apache/doris/backup/RestoreJob.java | 4 +-
.../apache/doris/datasource/InternalCatalog.java | 3 +-
.../org/apache/doris/master/ReportHandler.java | 3 +-
.../org/apache/doris/task/AlterReplicaTask.java | 51 ++++++++++++++------
.../org/apache/doris/task/CreateReplicaTask.java | 56 +++++++++++++++-------
.../java/org/apache/doris/task/AgentTaskTest.java | 4 +-
9 files changed, 104 insertions(+), 41 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index c127bd4b8d0..19d8dc13d75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -81,12 +81,14 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -247,6 +249,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
try {
BinlogConfig binlogConfig = new
BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry :
this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
@@ -291,7 +294,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
- binlogConfig);
+ binlogConfig, objectPool);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId),
baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
@@ -401,6 +404,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
}
tbl.readLock();
+ Map<Object, Object> objectPool = new ConcurrentHashMap<Object,
Object>();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry :
this.partitionIdToRollupIndex.entrySet()) {
@@ -479,7 +483,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
partitionId, rollupIndexId, baseIndexId,
rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash,
baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable,
tbl.getSchemaByIndexId(baseIndexId, true),
- whereClause);
+ whereClause, objectPool);
rollupBatchTask.addTask(rollupTask);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index d8c12066df6..d59a6d6bf9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -72,10 +72,12 @@ import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -238,6 +240,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
Preconditions.checkState(tbl.getState() ==
OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new
BinlogConfig(tbl.getBinlogConfig());
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
@@ -286,7 +289,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
- binlogConfig);
+ binlogConfig, objectPool);
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId,
shadowIdxId)
.get(shadowTabletId), originSchemaHash);
@@ -410,7 +413,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
tbl.readLock();
-
+ Map<Object, Object> objectPool = new ConcurrentHashMap<Object,
Object>();
try {
Map<String, Column> indexColumnMap = Maps.newHashMap();
for (Map.Entry<Long, List<Column>> entry :
indexSchemaMap.entrySet()) {
@@ -468,7 +471,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
AlterReplicaTask rollupTask = new
AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId,
originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash,
originSchemaHash, visibleVersion, jobId,
- JobType.SCHEMA_CHANGE, defineExprs,
descTable, originSchemaColumns, null);
+ JobType.SCHEMA_CHANGE, defineExprs,
descTable, originSchemaColumns, null,
+ objectPool);
schemaChangeBatchTask.addTask(rollupTask);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 42c69bba6e7..e3da9c70a54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -54,6 +54,8 @@ public class DescriptorTable {
private final HashMap<SlotDescriptor, SlotDescriptor>
outToIntermediateSlots = new HashMap<>();
+ private TDescriptorTable thriftDescTable = null; // serialized version of
this
+
public DescriptorTable() {
}
@@ -182,6 +184,9 @@ public class DescriptorTable {
}
public TDescriptorTable toThrift() {
+ if (thriftDescTable != null) {
+ return thriftDescTable;
+ }
TDescriptorTable result = new TDescriptorTable();
Map<Long, TableIf> referencedTbls = Maps.newHashMap();
for (TupleDescriptor tupleD : tupleDescs.values()) {
@@ -208,6 +213,7 @@ public class DescriptorTable {
for (TableIf tbl : referencedTbls.values()) {
result.addToTableDescriptors(tbl.toThrift());
}
+ thriftDescTable = result;
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index bf55f539e69..5812679daaf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -97,6 +97,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1035,6 +1036,7 @@ public class RestoreJob extends AbstractJob {
} finally {
localTbl.readUnlock();
}
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
@@ -1067,7 +1069,7 @@ public class RestoreJob extends AbstractJob {
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
localTbl.isDynamicSchema(),
- binlogConfig);
+ binlogConfig, objectPool);
task.setInRestoreMode(true);
batchTask.addTask(task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 6e167010ad7..1b2090378d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1866,6 +1866,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
@@ -1909,7 +1910,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
compactionPolicy,
timeSeriesCompactionGoalSizeMbytes,
timeSeriesCompactionFileCountThreshold,
timeSeriesCompactionTimeThresholdSeconds,
timeSeriesCompactionEmptyRowsetsThreshold,
timeSeriesCompactionLevelThreshold,
- storeRowColumn, isDynamicSchema, binlogConfig);
+ storeRowColumn, isDynamicSchema, binlogConfig,
objectPool);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 19418577434..13215503f66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -722,6 +722,7 @@ public class ReportHandler extends Daemon {
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
@@ -827,7 +828,7 @@ public class ReportHandler extends Daemon {
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
olapTable.isDynamicSchema(),
- binlogConfig);
+ binlogConfig, objectPool);
createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
index 178550dd229..2fa7f110bf5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
@@ -54,6 +54,7 @@ public class AlterReplicaTask extends AgentTask {
private Expr whereClause;
private DescriptorTable descTable;
private List<Column> baseSchemaColumns;
+ private Map<Object, Object> objectPool;
/**
* AlterReplicaTask constructor.
@@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long
partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long
newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType
jobType, Map<String, Expr> defineExprs,
- DescriptorTable descTable, List<Column> baseSchemaColumns, Expr
whereClause) {
+ DescriptorTable descTable, List<Column> baseSchemaColumns, Expr
whereClause,
+ Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId,
rollupIndexId, rollupTabletId);
this.baseTabletId = baseTabletId;
@@ -79,6 +81,7 @@ public class AlterReplicaTask extends AgentTask {
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
+ this.objectPool = objectPool;
}
public long getBaseTabletId() {
@@ -125,27 +128,47 @@ public class AlterReplicaTask extends AgentTask {
}
if (defineExprs != null) {
for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
- List<SlotRef> slots = Lists.newArrayList();
- entry.getValue().collect(SlotRef.class, slots);
- TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(entry.getKey());
- mvParam.setOriginColumnName(slots.get(0).getColumnName());
- mvParam.setMvExpr(entry.getValue().treeToThrift());
- req.addToMaterializedViewParams(mvParam);
+ Object value = objectPool.get(entry.getKey());
+ if (value == null) {
+ List<SlotRef> slots = Lists.newArrayList();
+ entry.getValue().collect(SlotRef.class, slots);
+ TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(entry.getKey());
+ mvParam.setOriginColumnName(slots.get(0).getColumnName());
+ mvParam.setMvExpr(entry.getValue().treeToThrift());
+ req.addToMaterializedViewParams(mvParam);
+ objectPool.put(entry.getKey(), mvParam);
+ } else {
+ TAlterMaterializedViewParam mvParam =
(TAlterMaterializedViewParam) value;
+ req.addToMaterializedViewParams(mvParam);
+ }
}
}
if (whereClause != null) {
- TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(Column.WHERE_SIGN);
- mvParam.setMvExpr(whereClause.treeToThrift());
- req.addToMaterializedViewParams(mvParam);
+ Object value = objectPool.get(Column.WHERE_SIGN);
+ if (value == null) {
+ TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(Column.WHERE_SIGN);
+ mvParam.setMvExpr(whereClause.treeToThrift());
+ req.addToMaterializedViewParams(mvParam);
+ } else {
+ TAlterMaterializedViewParam mvParam =
(TAlterMaterializedViewParam) value;
+ req.addToMaterializedViewParams(mvParam);
+ }
}
req.setDescTbl(descTable.toThrift());
if (baseSchemaColumns != null) {
- List<TColumn> columns = new ArrayList<TColumn>();
- for (Column column : baseSchemaColumns) {
- columns.add(column.toThrift());
+ Object value = objectPool.get(baseSchemaColumns);
+ if (value == null) {
+ List<TColumn> columns = new ArrayList<TColumn>();
+ for (Column column : baseSchemaColumns) {
+ columns.add(column.toThrift());
+ }
+ req.setColumns(columns);
+ objectPool.put(baseSchemaColumns, columns);
+ } else {
+ List<TColumn> columns = (List<TColumn>) value;
+ req.setColumns(columns);
}
- req.setColumns(columns);
}
return req;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 7388303f439..7ea5c951ca6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -46,6 +46,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -121,6 +122,8 @@ public class CreateReplicaTask extends AgentTask {
private BinlogConfig binlogConfig;
+ private Map<Object, Object> objectPool;
+
public CreateReplicaTask(long backendId, long dbId, long tableId, long
partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int
schemaHash, long version,
KeysType keysType, TStorageType storageType,
@@ -143,7 +146,8 @@ public class CreateReplicaTask extends AgentTask {
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
boolean isDynamicSchema,
- BinlogConfig binlogConfig) {
+ BinlogConfig binlogConfig,
+ Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId,
indexId, tabletId);
this.replicaId = replicaId;
@@ -188,6 +192,7 @@ public class CreateReplicaTask extends AgentTask {
this.timeSeriesCompactionLevelThreshold =
timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
+ this.objectPool = objectPool;
}
public void setIsRecoverTask(boolean isRecoverTask) {
@@ -248,21 +253,32 @@ public class CreateReplicaTask extends AgentTask {
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
- List<TColumn> tColumns = new ArrayList<TColumn>();
+ List<TColumn> tColumns = null;
+ Object tCols = objectPool.get(columns);
+ if (tCols != null) {
+ tColumns = (List<TColumn>) tCols;
+ } else {
+ tColumns = new ArrayList<>();
+ for (int i = 0; i < columns.size(); i++) {
+ Column column = columns.get(i);
+ TColumn tColumn = column.toThrift();
+ // is bloom filter column
+ if (bfColumns != null && bfColumns.contains(column.getName()))
{
+ tColumn.setIsBloomFilterColumn(true);
+ }
+ // when doing schema change, some modified column has a prefix
in name.
+ // this prefix is only used in FE, not visible to BE, so we
should remove this prefix.
+ if
(column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
+ tColumn.setColumnName(
+
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
+ }
+ tColumn.setVisible(column.isVisible());
+ tColumns.add(tColumn);
+ }
+ objectPool.put(columns, tColumns);
+ }
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
- TColumn tColumn = column.toThrift();
- // is bloom filter column
- if (bfColumns != null && bfColumns.contains(column.getName())) {
- tColumn.setIsBloomFilterColumn(true);
- }
- // when doing schema change, some modified column has a prefix in
name.
- // this prefix is only used in FE, not visible to BE, so we should
remove this prefix.
- if
(column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
-
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
- }
- tColumn.setVisible(column.isVisible());
- tColumns.add(tColumn);
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
@@ -279,9 +295,15 @@ public class CreateReplicaTask extends AgentTask {
tSchema.setVersionColIdx(versionCol);
if (CollectionUtils.isNotEmpty(indexes)) {
- List<TOlapTableIndex> tIndexes = new ArrayList<>();
- for (Index index : indexes) {
- tIndexes.add(index.toThrift());
+ List<TOlapTableIndex> tIndexes = null;
+ Object value = objectPool.get(indexes);
+ if (value != null) {
+ tIndexes = (List<TOlapTableIndex>) value;
+ } else {
+ tIndexes = new ArrayList<>();
+ for (Index index : indexes) {
+ tIndexes.add(index.toThrift());
+ }
}
tSchema.setIndexes(tIndexes);
storageFormat = TStorageFormat.V2;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 8e89d7c72cb..e9e2312d5ae 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -102,12 +102,12 @@ public class AgentTaskTest {
range2 = Range.closedOpen(pk2, pk3);
// create tasks
-
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId,
partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false,
TTabletType.TABLET_TYPE_DISK, null,
- TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, 0, false, false, null);
+ TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, 0, false, false, null, objectPool);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1,
schemaHash1, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]