This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bea5def48e2 [improvement](cloud) reduce call for partition get visible
version (#36309)
bea5def48e2 is described below
commit bea5def48e20e7ccd357729c781c816d7e07c7c3
Author: yujun <[email protected]>
AuthorDate: Mon Jun 17 09:50:46 2024 +0800
[improvement](cloud) reduce call for partition get visible version (#36309)
For cloud, partition's get visible version need a rpc. So need to reduce
call partition visible version.
---
.../main/java/org/apache/doris/backup/RestoreJob.java | 5 +++--
.../main/java/org/apache/doris/catalog/OlapTable.java | 3 ++-
.../main/java/org/apache/doris/catalog/Partition.java | 12 ++++++++++++
.../apache/doris/common/proc/TabletHealthProcDir.java | 18 ++++++++++++++----
.../java/org/apache/doris/planner/OlapTableSink.java | 18 ++++++++++++++----
.../main/java/org/apache/doris/system/Diagnoser.java | 5 +++--
6 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 6928f8ef2f1..859d3ac5f0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1800,13 +1800,14 @@ public class RestoreJob extends AbstractJob {
// update partition visible version
part.updateVersionForRestore(entry.getValue());
+ long visibleVersion = part.getVisibleVersion();
// we also need to update the replica version of these
overwritten restored partitions
for (MaterializedIndex idx :
part.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- if
(!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) {
-
replica.updateVersion(part.getVisibleVersion());
+ if
(!replica.checkVersionCatchUp(visibleVersion, false)) {
+ replica.updateVersion(visibleVersion);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 406c6d61793..479d214b7a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -675,6 +675,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
Map<Tag, Integer> nextIndexes = Maps.newHashMap();
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
Partition partition = entry.getValue();
+ long visibleVersion = partition.getVisibleVersion();
// entry.getKey() is the new partition id, use it to get the
restore specified
// replica allocation
ReplicaAllocation replicaAlloc =
partitionInfo.getReplicaAllocation(entry.getKey());
@@ -717,7 +718,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
for (Long beId : entry3.getValue()) {
long newReplicaId = env.getNextId();
Replica replica = new Replica(newReplicaId,
beId, ReplicaState.NORMAL,
- partition.getVisibleVersion(),
schemaHash);
+ visibleVersion, schemaHash);
newTablet.addReplica(replica, true /* is
restore */);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 1c9ec4e49a3..bb09773112d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -20,9 +20,12 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.rpc.RpcException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -37,6 +40,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Internal representation of partition-related metadata.
@@ -169,6 +173,14 @@ public class Partition extends MetaObject implements
Writable {
return visibleVersionTime;
}
+ public static List<Long> getVisibleVersions(List<? extends Partition>
partitions) throws RpcException {
+ if (Config.isCloudMode()) {
+ return
CloudPartition.getSnapshotVisibleVersion((List<CloudPartition>) partitions);
+ } else {
+ return
partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList());
+ }
+ }
+
/**
* if visibleVersion is 1, do not return creation time but 0
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index e28c74c327e..e858b7f58c5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -33,6 +33,7 @@ import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
@@ -40,6 +41,7 @@ import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Comparator;
@@ -180,7 +182,16 @@ public class TabletHealthProcDir implements
ProcDirInterface {
? colocateTableIndex.getGroup(olapTable.getId()) :
null;
olapTable.readLock();
try {
- for (Partition partition : olapTable.getAllPartitions()) {
+ List<Partition> partitions =
Lists.newArrayList(olapTable.getAllPartitions());
+ List<Long> visibleVersions = null;
+ try {
+ visibleVersions =
Partition.getVisibleVersions(partitions);
+ } catch (RpcException e) {
+ throw new RuntimeException("get version from meta
service failed:" + e.getMessage());
+ }
+ for (int j = 0; j < partitions.size(); j++) {
+ Partition partition = partitions.get(j);
+ long visibleVersion = visibleVersions.get(j);
ReplicaAllocation replicaAlloc =
olapTable.getPartitionInfo()
.getReplicaAllocation(partition.getId());
for (MaterializedIndex materializedIndex :
partition.getMaterializedIndices(
@@ -196,12 +207,11 @@ public class TabletHealthProcDir implements
ProcDirInterface {
replicaAlloc =
groupSchema.getReplicaAlloc();
}
Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, i);
- res =
tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
- backendsSet);
+ res =
tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet);
} else {
Pair<Tablet.TabletStatus,
TabletSchedCtx.Priority> pair
=
tablet.getHealthStatusWithPriority(infoService,
- partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
+ visibleVersion, replicaAlloc,
aliveBeIds);
res = pair.first;
}
switch (res) { // CHECKSTYLE IGNORE THIS LINE:
missing switch default
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 7e27dd8a606..8c40e467338 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -55,6 +55,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
@@ -623,8 +624,17 @@ public class OlapTableSink extends DataSink {
TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
// BE id -> path hash
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
- for (Long partitionId : partitionIds) {
- Partition partition = table.getPartition(partitionId);
+ List<Partition> partitions = partitionIds.stream().map(partitionId ->
table.getPartition(partitionId))
+ .collect(Collectors.toList());
+ List<Long> visibleVersions = null;
+ try {
+ visibleVersions = Partition.getVisibleVersions(partitions);
+ } catch (RpcException e) {
+ throw new UserException("OlapTableSink get partition visible
version failed", e);
+ }
+ for (int i = 0; i < partitions.size(); i++) {
+ Partition partition = partitions.get(i);
+ long visibleVersion = visibleVersions.get(i);
int loadRequiredReplicaNum =
table.getLoadRequiredReplicaNum(partition.getId());
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
// we should ensure the replica backend is alive
@@ -635,14 +645,14 @@ public class OlapTableSink extends DataSink {
String errMsg = "tablet " + tablet.getId() + " alive
replica num " + bePathsMap.keySet().size()
+ " < load required replica num " +
loadRequiredReplicaNum
+ ", alive backends: [" +
StringUtils.join(bePathsMap.keySet(), ",") + "]"
- + ", detail: " +
tablet.getDetailsStatusForQuery(partition.getVisibleVersion());
+ + ", detail: " +
tablet.getDetailsStatusForQuery(visibleVersion);
if (Config.isCloudMode()) {
errMsg += ConnectContext.cloudNoBackendsReason();
}
throw new
UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg);
}
- debugWriteRandomChooseSink(tablet,
partition.getVisibleVersion(), bePathsMap);
+ debugWriteRandomChooseSink(tablet, visibleVersion,
bePathsMap);
if (bePathsMap.keySet().isEmpty()) {
throw new
UserException(InternalErrorCode.REPLICA_FEW_ERR,
"tablet " + tablet.getId() + " no available
replica");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
index 5e7748a3524..c2a091d11c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
@@ -109,6 +109,7 @@ public class Diagnoser {
StringBuilder versionErr = new StringBuilder();
StringBuilder statusErr = new StringBuilder();
StringBuilder compactionErr = new StringBuilder();
+ long visibleVersion = partition.getVisibleVersion();
for (Replica replica : replicas) {
// backend
do {
@@ -139,10 +140,10 @@ public class Diagnoser {
}
} while (false);
// version
- if (replica.getVersion() != partition.getVisibleVersion()) {
+ if (replica.getVersion() != visibleVersion) {
versionErr.append("Replica on backend " +
replica.getBackendId() + "'s version ("
+ replica.getVersion() + ") does not equal"
- + " to partition visible version (" +
partition.getVisibleVersion() + ")");
+ + " to partition visible version (" + visibleVersion +
")");
} else if (replica.getLastFailedVersion() != -1) {
versionErr.append("Replica on backend " +
replica.getBackendId() + "'s last failed version is "
+ replica.getLastFailedVersion());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]