This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d4d619f45f7 [fix](merge-cloud) Set visible version for OlapScanNode at
the plan phase (#32473)
d4d619f45f7 is described below
commit d4d619f45f7614179fbd4f76a3febd0a57820f68
Author: walter <[email protected]>
AuthorDate: Thu Mar 21 19:23:48 2024 +0800
[fix](merge-cloud) Set visible version for OlapScanNode at the plan phase
(#32473)
This PR fix the spark read with incorrect partition version in the cloud
mode
---
.../apache/doris/cloud/qe/CloudCoordinator.java | 69 ----------------------
.../org/apache/doris/nereids/NereidsPlanner.java | 6 +-
.../org/apache/doris/planner/OriginalPlanner.java | 3 +
.../java/org/apache/doris/planner/Planner.java | 1 -
.../java/org/apache/doris/planner/ScanNode.java | 69 ++++++++++++++++++++++
5 files changed, 77 insertions(+), 71 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
index 2ce8950c12f..503c517ca2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
@@ -20,32 +20,22 @@ package org.apache.doris.cloud.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.catalog.CloudEnv;
-import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
-import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
-import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
public class CloudCoordinator extends Coordinator {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@@ -100,63 +90,4 @@ public class CloudCoordinator extends Coordinator {
+ clusterName);
}
}
-
- @Override
- protected void computeScanRangeAssignment() throws Exception {
- setVisibleVersionForOlapScanNode();
- super.computeScanRangeAssignment();
- }
-
- // In cloud mode, meta read lock is not enough to keep a snapshot of the
partition versions.
- // After all scan node are collected, it is possible to gain a snapshot of
the partition version.
- private void setVisibleVersionForOlapScanNode() throws RpcException,
UserException {
- List<CloudPartition> partitions = new ArrayList<>();
- Set<Long> partitionSet = new HashSet<>();
- for (ScanNode node : scanNodes) {
- if (!(node instanceof OlapScanNode)) {
- continue;
- }
-
- OlapScanNode scanNode = (OlapScanNode) node;
- OlapTable table = scanNode.getOlapTable();
- for (Long id : scanNode.getSelectedPartitionIds()) {
- if (!partitionSet.contains(id)) {
- partitionSet.add(id);
- partitions.add((CloudPartition) table.getPartition(id));
- }
- }
- }
-
- if (partitions.isEmpty()) {
- return;
- }
-
- List<Long> versions =
CloudPartition.getSnapshotVisibleVersion(partitions);
- assert versions.size() == partitions.size() : "the got num versions is
not equals to acquired num versions";
- if (versions.stream().anyMatch(x -> x <= 0)) {
- int size = versions.size();
- for (int i = 0; i < size; ++i) {
- if (versions.get(i) <= 0) {
- LOG.warn("partition {} getVisibleVersion error, the
visibleVersion is {}",
- partitions.get(i).getId(), versions.get(i));
- throw new UserException("partition " +
partitions.get(i).getId()
- + " getVisibleVersion error, the visibleVersion is " +
versions.get(i));
- }
- }
- }
-
- // ATTN: the table ids are ignored here because the both id are
allocated from a same id generator.
- Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
- .boxed()
- .collect(Collectors.toMap(i -> partitions.get(i).getId(),
versions::get));
-
- for (ScanNode node : scanNodes) {
- if (!(node instanceof OlapScanNode)) {
- continue;
- }
-
- OlapScanNode scanNode = (OlapScanNode) node;
- scanNode.updateScanRangeVersions(visibleVersionMap);
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index eedc77e9df7..424cdc5dd58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -101,7 +102,7 @@ public class NereidsPlanner extends Planner {
}
@Override
- public void plan(StatementBase queryStmt,
org.apache.doris.thrift.TQueryOptions queryOptions) {
+ public void plan(StatementBase queryStmt,
org.apache.doris.thrift.TQueryOptions queryOptions) throws UserException {
if
(statementContext.getConnectContext().getSessionVariable().isEnableNereidsTrace())
{
NereidsTracer.init();
} else {
@@ -154,6 +155,9 @@ public class NereidsPlanner extends Planner {
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
+
+ // update scan nodes visible version at the end of plan phase.
+ ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
}
@VisibleForTesting
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index db327bddcb9..a72a76b6fca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -98,6 +98,9 @@ public class OriginalPlanner extends Planner {
public void plan(StatementBase queryStmt, TQueryOptions queryOptions)
throws UserException {
createPlanFragments(queryStmt, analyzer, queryOptions);
+
+ // update scan nodes visible version at the end of plan phase.
+ ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 22495e792ff..44befc75df6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -125,5 +125,4 @@ public abstract class Planner {
public abstract Optional<ResultSet> handleQueryInFe(StatementBase
parsedStmt);
public abstract void addHook(PlannerHook hook);
-
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 21f6bb07bd7..490a72f895b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -36,15 +36,19 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
@@ -67,10 +71,12 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Representation of the common elements of all scan nodes.
@@ -738,4 +744,67 @@ public abstract class ScanNode extends PlanNode {
public boolean shouldUseOneInstance() {
return hasLimit() && conjuncts.isEmpty();
}
+
+ // In cloud mode, meta read lock is not enough to keep a snapshot of the
partition versions.
+ // After all scan node are collected, it is possible to gain a snapshot of
the partition version.
+ public static void setVisibleVersionForOlapScanNodes(List<ScanNode>
scanNodes) throws UserException {
+ if (Config.isNotCloudMode()) {
+ return;
+ }
+
+ List<CloudPartition> partitions = new ArrayList<>();
+ Set<Long> partitionSet = new HashSet<>();
+ for (ScanNode node : scanNodes) {
+ if (!(node instanceof OlapScanNode)) {
+ continue;
+ }
+
+ OlapScanNode scanNode = (OlapScanNode) node;
+ OlapTable table = scanNode.getOlapTable();
+ for (Long id : scanNode.getSelectedPartitionIds()) {
+ if (!partitionSet.contains(id)) {
+ partitionSet.add(id);
+ partitions.add((CloudPartition) table.getPartition(id));
+ }
+ }
+ }
+
+ if (partitions.isEmpty()) {
+ return;
+ }
+
+ List<Long> versions;
+ try {
+ versions = CloudPartition.getSnapshotVisibleVersion(partitions);
+ } catch (RpcException e) {
+ throw new UserException("get visible version for OlapScanNode
failed", e);
+ }
+
+ assert versions.size() == partitions.size() : "the got num versions is
not equals to acquired num versions";
+ if (versions.stream().anyMatch(x -> x <= 0)) {
+ int size = versions.size();
+ for (int i = 0; i < size; ++i) {
+ if (versions.get(i) <= 0) {
+ LOG.warn("partition {} getVisibleVersion error, the
visibleVersion is {}",
+ partitions.get(i).getId(), versions.get(i));
+ throw new UserException("partition " +
partitions.get(i).getId()
+ + " getVisibleVersion error, the visibleVersion is " +
versions.get(i));
+ }
+ }
+ }
+
+ // ATTN: the table ids are ignored here because the both id are
allocated from a same id generator.
+ Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
+ .boxed()
+ .collect(Collectors.toMap(i -> partitions.get(i).getId(),
versions::get));
+
+ for (ScanNode node : scanNodes) {
+ if (!(node instanceof OlapScanNode)) {
+ continue;
+ }
+
+ OlapScanNode scanNode = (OlapScanNode) node;
+ scanNode.updateScanRangeVersions(visibleVersionMap);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]