This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new fd860825c06 [improvement](checkpoint) checkpoint thread update tablet
invert index #25098 (#25388)
fd860825c06 is described below
commit fd860825c06049f8883b361a35858f382899b6b8
Author: yujun <[email protected]>
AuthorDate: Fri Oct 13 16:23:59 2023 +0800
[improvement](checkpoint) checkpoint thread update tablet invert index
#25098 (#25388)
Checkpoint thread doesn't update tablet invert index, then in checkpoint
thread TabletInvertedIndex.getTablet/getReplica will return null. It may cause
some problems. Fix this, let checkpoint thread also update tablet invert index.
---
.../doris/alter/MaterializedViewHandler.java | 8 +-
.../apache/doris/catalog/CatalogRecycleBin.java | 6 +-
.../main/java/org/apache/doris/catalog/Env.java | 9 +-
.../apache/doris/catalog/TabletInvertedIndex.java | 12 ---
.../org/apache/doris/catalog/TempPartitions.java | 2 +-
.../apache/doris/datasource/InternalCatalog.java | 110 ++++++++++-----------
6 files changed, 61 insertions(+), 86 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index b9f1ba3585f..bf79c8a7359 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -1010,11 +1010,9 @@ public class MaterializedViewHandler extends
AlterHandler {
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex =
partition.deleteRollupIndex(rollupIndexId);
- if (!Env.isCheckpointThread()) {
- // remove from inverted index
- for (Tablet tablet : rollupIndex.getTablets()) {
- invertedIndex.deleteTablet(tablet.getId());
- }
+ // remove from inverted index
+ for (Tablet tablet : rollupIndex.getTablets()) {
+ invertedIndex.deleteTablet(tablet.getId());
}
}
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 759a34ad445..890a76ee24b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -409,7 +409,7 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
RecycleTableInfo tableInfo = idToTable.remove(tableId);
idToRecycleTime.remove(tableId);
Table table = tableInfo.getTable();
- if (table.getType() == TableType.OLAP && !Env.isCheckpointThread()) {
+ if (table.getType() == TableType.OLAP) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, true);
}
LOG.info("replay erase table[{}]", tableId);
@@ -519,9 +519,7 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
Partition partition = partitionInfo.getPartition();
- if (!Env.isCheckpointThread()) {
- Env.getCurrentEnv().onErasePartition(partition);
- }
+ Env.getCurrentEnv().onErasePartition(partition);
LOG.info("replay erase partition[{}]", partitionId);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index ea4410f60c6..6748a8b4afc 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1888,10 +1888,8 @@ public class Env {
public long loadRecycleBin(DataInputStream dis, long checksum) throws
IOException {
recycleBin.readFields(dis);
- if (!isCheckpointThread()) {
- // add tablet in Recycle bin to TabletInvertedIndex
- recycleBin.addTabletToInvertedIndex();
- }
+ // add tablet in Recycle bin to TabletInvertedIndex
+ recycleBin.addTabletToInvertedIndex();
// create DatabaseTransactionMgr for db in recycle bin.
// these dbs do not exist in `idToDb` of the catalog.
for (Long dbId : recycleBin.getAllDbIds()) {
@@ -5281,7 +5279,7 @@ public class Env {
}
}
- if (!isReplay) {
+ if (!isReplay && !Env.isCheckpointThread()) {
// drop all replicas
AgentBatchTask batchTask = new AgentBatchTask();
for (Partition partition : olapTable.getAllPartitions()) {
@@ -5305,6 +5303,7 @@ public class Env {
AgentTaskExecutor.submit(batchTask);
}
+ // TODO: does checkpoint need update colocate index ?
// colocation
Env.getCurrentColocateIndex().removeTable(olapTable.getId());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index ece6ebd8b5b..2b601f9f030 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -506,9 +506,6 @@ public class TabletInvertedIndex {
// always add tablet before adding replicas
public void addTablet(long tabletId, TabletMeta tabletMeta) {
- if (Env.isCheckpointThread()) {
- return;
- }
long stamp = writeLock();
try {
if (tabletMetaMap.containsKey(tabletId)) {
@@ -527,9 +524,6 @@ public class TabletInvertedIndex {
}
public void deleteTablet(long tabletId) {
- if (Env.isCheckpointThread()) {
- return;
- }
long stamp = writeLock();
try {
Map<Long, Replica> replicas =
replicaMetaTable.rowMap().remove(tabletId);
@@ -555,9 +549,6 @@ public class TabletInvertedIndex {
}
public void addReplica(long tabletId, Replica replica) {
- if (Env.isCheckpointThread()) {
- return;
- }
long stamp = writeLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
@@ -572,9 +563,6 @@ public class TabletInvertedIndex {
}
public void deleteReplica(long tabletId, long backendId) {
- if (Env.isCheckpointThread()) {
- return;
- }
long stamp = writeLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
index f64d5bed0c3..9cd2d61bf91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
@@ -77,7 +77,7 @@ public class TempPartitions implements Writable,
GsonPostProcessable {
if (partition != null) {
idToPartition.remove(partition.getId());
nameToPartition.remove(partitionName);
- if (!Env.isCheckpointThread() && needDropTablet) {
+ if (needDropTablet) {
TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 65c8f885604..283ae2dcc67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -361,10 +361,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
* create the tablet inverted index from metadata.
*/
public void recreateTabletInvertIndex() {
- if (Env.isCheckpointThread()) {
- return;
- }
-
// create inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Database db : this.fullNameToDb.values()) {
@@ -1296,31 +1292,31 @@ public class InternalCatalog implements
CatalogIf<Database> {
} catch (DdlException e) {
throw new MetaNotFoundException(e.getMessage());
}
- if (!Env.isCheckpointThread()) {
- // add to inverted index
- if (table.isManagedTable()) {
- TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
- OlapTable olapTable = (OlapTable) table;
- long dbId = db.getId();
- long tableId = table.getId();
- for (Partition partition : olapTable.getAllPartitions()) {
- long partitionId = partition.getId();
- TStorageMedium medium =
olapTable.getPartitionInfo().getDataProperty(partitionId)
- .getStorageMedium();
- for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = mIndex.getId();
- int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
- for (Tablet tablet : mIndex.getTablets()) {
- TabletMeta tabletMeta = new TabletMeta(dbId,
tableId, partitionId, indexId, schemaHash,
- medium);
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
+ // add to inverted index
+ if (table.isManagedTable()) {
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ OlapTable olapTable = (OlapTable) table;
+ long dbId = db.getId();
+ long tableId = table.getId();
+ for (Partition partition : olapTable.getAllPartitions()) {
+ long partitionId = partition.getId();
+ TStorageMedium medium =
olapTable.getPartitionInfo().getDataProperty(partitionId)
+ .getStorageMedium();
+ for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = mIndex.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ for (Tablet tablet : mIndex.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash,
+ medium);
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
}
}
- } // end for partitions
+ }
+ } // end for partitions
+ if (!Env.isCheckpointThread()) {
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable,
true);
}
}
@@ -1695,20 +1691,18 @@ public class InternalCatalog implements
CatalogIf<Database> {
partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(),
info.isTempPartition(),
partitionItem, info.getDataProperty(),
info.getReplicaAlloc(), info.isInMemory(), info.isMutable());
- if (!Env.isCheckpointThread()) {
- // add to inverted index
- TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
- for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = index.getId();
- int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- for (Tablet tablet : index.getTablets()) {
- TabletMeta tabletMeta = new TabletMeta(info.getDbId(),
info.getTableId(), partition.getId(),
- index.getId(), schemaHash,
info.getDataProperty().getStorageMedium());
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
+ // add to inverted index
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = index.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ for (Tablet tablet : index.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(info.getDbId(),
info.getTableId(), partition.getId(),
+ index.getId(), schemaHash,
info.getDataProperty().getStorageMedium());
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
}
}
}
@@ -3054,24 +3048,22 @@ public class InternalCatalog implements
CatalogIf<Database> {
try {
truncateTableInternal(olapTable, info.getPartitions(),
info.isEntireTable());
- if (!Env.isCheckpointThread()) {
- // add tablet to inverted index
- TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
- for (Partition partition : info.getPartitions()) {
- long partitionId = partition.getId();
- TStorageMedium medium =
olapTable.getPartitionInfo().getDataProperty(partitionId)
- .getStorageMedium();
- for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = mIndex.getId();
- int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
- for (Tablet tablet : mIndex.getTablets()) {
- TabletMeta tabletMeta = new TabletMeta(db.getId(),
olapTable.getId(), partitionId, indexId,
- schemaHash, medium);
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
+ // add tablet to inverted index
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ for (Partition partition : info.getPartitions()) {
+ long partitionId = partition.getId();
+ TStorageMedium medium =
olapTable.getPartitionInfo().getDataProperty(partitionId)
+ .getStorageMedium();
+ for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = mIndex.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ for (Tablet tablet : mIndex.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(db.getId(),
olapTable.getId(), partitionId, indexId,
+ schemaHash, medium);
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]