This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3dd27334bfa [fix](schema change) reduce memory usage of create a good
deal of shadow tablets in schema change process (#36285)
3dd27334bfa is described below
commit 3dd27334bfa24338a6973a368183847c8d74a298
Author: Lightman <[email protected]>
AuthorDate: Sat Jun 15 23:28:12 2024 +0800
[fix](schema change) reduce memory usage of create a good deal of shadow
tablets in schema change process (#36285)
When doing schema change, it will create many shadow tablets. And many
TCreateTabletReq will be new. So we can use object pool to reduct they
memory usage.
---
.../java/org/apache/doris/alter/RollupJobV2.java | 4 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 4 +-
.../java/org/apache/doris/backup/RestoreJob.java | 4 +-
.../apache/doris/datasource/InternalCatalog.java | 3 +-
.../org/apache/doris/master/ReportHandler.java | 3 +-
.../org/apache/doris/task/CreateReplicaTask.java | 56 +++++++++++++++-------
.../java/org/apache/doris/task/AgentTaskTest.java | 4 +-
7 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 93aceb0936b..66912307ed1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -80,6 +80,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -218,6 +219,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
try {
BinlogConfig binlogConfig = new
BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry :
this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
@@ -261,7 +263,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
- binlogConfig);
+ binlogConfig, objectPool);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId),
baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 57ae326374a..f58ca8de261 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -72,6 +72,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -226,6 +227,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
try {
Preconditions.checkState(tbl.getState() ==
OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new
BinlogConfig(tbl.getBinlogConfig());
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
@@ -275,7 +277,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
- binlogConfig);
+ binlogConfig, objectPool);
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId,
shadowIdxId)
.get(shadowTabletId), originSchemaHash);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 53f1da8582b..6928f8ef2f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -100,6 +100,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1087,6 +1088,7 @@ public class RestoreJob extends AbstractJob {
} finally {
localTbl.readUnlock();
}
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
List<Index> indexes = restoredIdx.getId() ==
localTbl.getBaseIndexId()
@@ -1120,7 +1122,7 @@ public class RestoreJob extends AbstractJob {
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
- binlogConfig);
+ binlogConfig, objectPool);
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
task.setInRestoreMode(true);
batchTask.addTask(task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 5a2a8c48793..ad1bbe5f72d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1998,6 +1998,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
@@ -2046,7 +2047,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
- tbl.storeRowColumn(), binlogConfig);
+ tbl.storeRowColumn(), binlogConfig, objectPool);
task.setStorageFormat(tbl.getStorageFormat());
task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 27213652d46..e74467ea6db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -775,6 +775,7 @@ public class ReportHandler extends Daemon {
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
@@ -881,7 +882,7 @@ public class ReportHandler extends Daemon {
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
- binlogConfig);
+ binlogConfig, objectPool);
createReplicaTask.setIsRecoverTask(true);
createReplicaTask.setInvertedIndexStorageFormat(olapTable
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 7a5262ba0ed..1de5d4e8d7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -123,6 +124,8 @@ public class CreateReplicaTask extends AgentTask {
private BinlogConfig binlogConfig;
private List<Integer> clusterKeyIndexes;
+ private Map<Object, Object> objectPool;
+
public CreateReplicaTask(long backendId, long dbId, long tableId, long
partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int
schemaHash, long version,
KeysType keysType, TStorageType storageType,
@@ -144,7 +147,8 @@ public class CreateReplicaTask extends AgentTask {
long timeSeriesCompactionEmptyRowsetsThreshold,
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
- BinlogConfig binlogConfig) {
+ BinlogConfig binlogConfig,
+ Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId,
indexId, tabletId);
this.replicaId = replicaId;
@@ -188,6 +192,7 @@ public class CreateReplicaTask extends AgentTask {
this.timeSeriesCompactionLevelThreshold =
timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
+ this.objectPool = objectPool;
}
public void setIsRecoverTask(boolean isRecoverTask) {
@@ -260,21 +265,32 @@ public class CreateReplicaTask extends AgentTask {
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
- List<TColumn> tColumns = new ArrayList<TColumn>();
+ List<TColumn> tColumns = null;
+ Object tCols = objectPool.get(columns);
+ if (tCols != null) {
+ tColumns = (List<TColumn>) tCols;
+ } else {
+ tColumns = new ArrayList<>();
+ for (int i = 0; i < columns.size(); i++) {
+ Column column = columns.get(i);
+ TColumn tColumn = column.toThrift();
+ // is bloom filter column
+ if (bfColumns != null && bfColumns.contains(column.getName()))
{
+ tColumn.setIsBloomFilterColumn(true);
+ }
+ // when doing schema change, some modified column has a prefix
in name.
+ // this prefix is only used in FE, not visible to BE, so we
should remove this prefix.
+ if
(column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
+ tColumn.setColumnName(
+
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
+ }
+ tColumn.setVisible(column.isVisible());
+ tColumns.add(tColumn);
+ }
+ objectPool.put(columns, tColumns);
+ }
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
- TColumn tColumn = column.toThrift();
- // is bloom filter column
- if (bfColumns != null && bfColumns.contains(column.getName())) {
- tColumn.setIsBloomFilterColumn(true);
- }
- // when doing schema change, some modified column has a prefix in
name.
- // this prefix is only used in FE, not visible to BE, so we should
remove this prefix.
- if
(column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
-
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
- }
- tColumn.setVisible(column.isVisible());
- tColumns.add(tColumn);
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
@@ -296,9 +312,15 @@ public class CreateReplicaTask extends AgentTask {
}
}
if (CollectionUtils.isNotEmpty(indexes)) {
- List<TOlapTableIndex> tIndexes = new ArrayList<>();
- for (Index index : indexes) {
- tIndexes.add(index.toThrift());
+ List<TOlapTableIndex> tIndexes = null;
+ Object value = objectPool.get(indexes);
+ if (value != null) {
+ tIndexes = (List<TOlapTableIndex>) value;
+ } else {
+ tIndexes = new ArrayList<>();
+ for (Index index : indexes) {
+ tIndexes.add(index.toThrift());
+ }
}
tSchema.setIndexes(tIndexes);
storageFormat = TStorageFormat.V2;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 3dc4bc4695c..b604076ddba 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -102,12 +102,12 @@ public class AgentTaskTest {
range2 = Range.closedOpen(pk2, pk3);
// create tasks
-
+ Map<Object, Object> objectPool = new HashMap<Object, Object>();
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId,
partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false,
TTabletType.TABLET_TYPE_DISK, null,
- TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, 0, false, null);
+ TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, 0, false, null, objectPool);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1,
schemaHash1, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]