This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3ffc6f08354 [fix](Nereids) should do read lock on table being insert
when analyze (#25619)
3ffc6f08354 is described below
commit 3ffc6f08354ca10a796d880aa762dee7a2f3be00
Author: morrySnow <[email protected]>
AuthorDate: Fri Oct 20 21:09:19 2023 +0800
[fix](Nereids) should do read lock on table being insert when analyze
(#25619)
---
.../org/apache/doris/nereids/CascadesContext.java | 121 ++++++++++++++++-----
.../doris/nereids/rules/analysis/BindRelation.java | 2 +-
.../apache/doris/nereids/util/ReadLockTest.java | 41 +++++--
3 files changed, 123 insertions(+), 41 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index 4cd248cb8a3..19d38873f9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.analyzer.Scope;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.jobs.Job;
@@ -52,7 +54,9 @@ import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -61,6 +65,8 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.util.Lists;
import java.util.ArrayList;
import java.util.HashMap;
@@ -74,6 +80,7 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
/**
@@ -96,7 +103,7 @@ public class CascadesContext implements ScheduleContext {
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
private final RuntimeFilterContext runtimeFilterContext;
private Optional<Scope> outerScope = Optional.empty();
- private List<TableIf> tables = null;
+ private Map<Long, TableIf> tables = null;
private boolean isRewriteRoot;
private volatile boolean isTimeout = false;
@@ -213,7 +220,7 @@ public class CascadesContext implements ScheduleContext {
}
public void setTables(List<TableIf> tables) {
- this.tables = tables;
+ this.tables = tables.stream().collect(Collectors.toMap(TableIf::getId,
t -> t, (t1, t2) -> t1));
}
public ConnectContext getConnectContext() {
@@ -330,21 +337,23 @@ public class CascadesContext implements ScheduleContext {
* Extract tables.
*/
public void extractTables(LogicalPlan logicalPlan) {
- Set<UnboundRelation> relations = getTables(logicalPlan);
- tables = new ArrayList<>();
- for (UnboundRelation r : relations) {
+ Set<List<String>> tableNames = getTables(logicalPlan);
+ tables = Maps.newHashMap();
+ for (List<String> tableName : tableNames) {
try {
- tables.add(getTable(r));
+ TableIf table = getTable(tableName);
+ tables.put(table.getId(), table);
} catch (Throwable e) {
// IGNORE
}
}
+
}
/** get table by table name, try to get from information from dumpfile
first */
public TableIf getTableInMinidumpCache(String tableName) {
- Preconditions.checkState(tables != null);
- for (TableIf table : tables) {
+ Preconditions.checkState(tables != null, "tables should not be null");
+ for (TableIf table : tables.values()) {
if (table.getName().equals(tableName)) {
return table;
}
@@ -356,45 +365,101 @@ public class CascadesContext implements ScheduleContext {
}
public List<TableIf> getTables() {
- return tables;
+ if (tables == null) {
+ return null;
+ } else {
+ return Lists.newArrayList(tables.values());
+ }
}
- private Set<UnboundRelation> getTables(LogicalPlan logicalPlan) {
- Set<UnboundRelation> unboundRelations = new HashSet<>();
+ private Set<List<String>> getTables(LogicalPlan logicalPlan) {
+ final Set<List<String>> tableNames = new HashSet<>();
logicalPlan.foreach(p -> {
if (p instanceof LogicalFilter) {
-
unboundRelations.addAll(extractUnboundRelationFromFilter((LogicalFilter<?>) p));
+
tableNames.addAll(extractTableNamesFromFilter((LogicalFilter<?>) p));
} else if (p instanceof LogicalCTE) {
-
unboundRelations.addAll(extractUnboundRelationFromCTE((LogicalCTE<?>) p));
+ tableNames.addAll(extractTableNamesFromCTE((LogicalCTE<?>) p));
+ } else if (p instanceof LogicalProject) {
+
tableNames.addAll(extractTableNamesFromProject((LogicalProject<?>) p));
+ } else if (p instanceof LogicalHaving) {
+
tableNames.addAll(extractTableNamesFromHaving((LogicalHaving<?>) p));
+ } else if (p instanceof UnboundOneRowRelation) {
+
tableNames.addAll(extractTableNamesFromOneRowRelation((UnboundOneRowRelation)
p));
} else {
-
unboundRelations.addAll(p.collect(UnboundRelation.class::isInstance));
+ Set<LogicalPlan> logicalPlans = p.collect(
+ n -> (n instanceof UnboundRelation || n instanceof
UnboundOlapTableSink));
+ for (LogicalPlan plan : logicalPlans) {
+ if (plan instanceof UnboundRelation) {
+ tableNames.add(((UnboundRelation)
plan).getNameParts());
+ } else if (plan instanceof UnboundOlapTableSink) {
+ tableNames.add(((UnboundOlapTableSink<?>)
plan).getNameParts());
+ } else {
+ throw new AnalysisException("get tables from plan
failed. meet unknown type node " + plan);
+ }
+ }
}
});
- return unboundRelations;
+ return tableNames;
+ }
+
+ private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?>
having) {
+ Set<SubqueryExpr> subqueryExprs = having.getPredicate()
+ .collect(SubqueryExpr.class::isInstance);
+ Set<List<String>> tableNames = new HashSet<>();
+ for (SubqueryExpr expr : subqueryExprs) {
+ LogicalPlan plan = expr.getQueryPlan();
+ tableNames.addAll(getTables(plan));
+ }
+ return tableNames;
+ }
+
+ private Set<List<String>>
extractTableNamesFromOneRowRelation(UnboundOneRowRelation oneRowRelation) {
+ Set<SubqueryExpr> subqueryExprs = oneRowRelation.getProjects().stream()
+ .<Set<SubqueryExpr>>map(p ->
p.collect(SubqueryExpr.class::isInstance))
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+ Set<List<String>> tableNames = new HashSet<>();
+ for (SubqueryExpr expr : subqueryExprs) {
+ LogicalPlan plan = expr.getQueryPlan();
+ tableNames.addAll(getTables(plan));
+ }
+ return tableNames;
+ }
+
+ private Set<List<String>> extractTableNamesFromProject(LogicalProject<?>
project) {
+ Set<SubqueryExpr> subqueryExprs = project.getProjects().stream()
+ .<Set<SubqueryExpr>>map(p ->
p.collect(SubqueryExpr.class::isInstance))
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+ Set<List<String>> tableNames = new HashSet<>();
+ for (SubqueryExpr expr : subqueryExprs) {
+ LogicalPlan plan = expr.getQueryPlan();
+ tableNames.addAll(getTables(plan));
+ }
+ return tableNames;
}
- private Set<UnboundRelation>
extractUnboundRelationFromFilter(LogicalFilter<?> filter) {
+ private Set<List<String>> extractTableNamesFromFilter(LogicalFilter<?>
filter) {
Set<SubqueryExpr> subqueryExprs = filter.getPredicate()
.collect(SubqueryExpr.class::isInstance);
- Set<UnboundRelation> relations = new HashSet<>();
+ Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
- relations.addAll(getTables(plan));
+ tableNames.addAll(getTables(plan));
}
- return relations;
+ return tableNames;
}
- private Set<UnboundRelation> extractUnboundRelationFromCTE(LogicalCTE<?>
cte) {
+ private Set<List<String>> extractTableNamesFromCTE(LogicalCTE<?> cte) {
List<LogicalSubQueryAlias<Plan>> subQueryAliases =
cte.getAliasQueries();
- Set<UnboundRelation> relations = new HashSet<>();
+ Set<List<String>> tableNames = new HashSet<>();
for (LogicalSubQueryAlias<Plan> subQueryAlias : subQueryAliases) {
- relations.addAll(getTables(subQueryAlias));
+ tableNames.addAll(getTables(subQueryAlias));
}
- return relations;
+ return tableNames;
}
- private TableIf getTable(UnboundRelation unboundRelation) {
- List<String> nameParts = unboundRelation.getNameParts();
+ private TableIf getTable(List<String> nameParts) {
switch (nameParts.size()) {
case 1: { // table
String ctlName =
getConnectContext().getEnv().getCurrentCatalog().getName();
@@ -413,7 +478,7 @@ public class CascadesContext implements ScheduleContext {
return getTable(nameParts.get(0), nameParts.get(1),
nameParts.get(2), getConnectContext().getEnv());
}
default:
- throw new IllegalStateException("Table name [" +
unboundRelation.getTableName() + "] is invalid.");
+ throw new IllegalStateException("Table name [" +
String.join(".", nameParts) + "] is invalid.");
}
}
@@ -455,10 +520,10 @@ public class CascadesContext implements ScheduleContext {
public Lock(LogicalPlan plan, CascadesContext cascadesContext) {
this.cascadesContext = cascadesContext;
// tables can also be load from dump file
- if (cascadesContext.getTables() == null) {
+ if (cascadesContext.tables == null) {
cascadesContext.extractTables(plan);
}
- for (TableIf table : cascadesContext.tables) {
+ for (TableIf table : cascadesContext.tables.values()) {
if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
close();
throw new RuntimeException(String.format("Failed to get
read lock on table: %s", table.getName()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index f97057e105d..689763b6f79 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -140,7 +140,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
List<String> tableQualifier =
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
unboundRelation.getNameParts());
TableIf table = null;
- if (cascadesContext.getTables() != null) {
+ if (!CollectionUtils.isEmpty(cascadesContext.getTables())) {
table = cascadesContext.getTableInMinidumpCache(tableName);
}
if (table == null) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
index 0db39e9e11d..58212c2d3ba 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
@@ -17,14 +17,15 @@
package org.apache.doris.nereids.util;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -47,10 +48,10 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
- List<Table> f = (List<Table>)
Deencapsulation.getField(cascadesContext, "tables");
- Assertions.assertEquals(1, f.size());
- Assertions.assertEquals("supplier",
f.stream().map(Table::getName).findFirst().get());
+ List<TableIf> f = cascadesContext.getTables();
+ Assertions.assertEquals(1, f.size());
+ Assertions.assertEquals("supplier",
f.stream().map(TableIf::getName).findFirst().get());
}
@Test
@@ -69,9 +70,9 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
- List<Table> f = (List<Table>)
Deencapsulation.getField(cascadesContext, "tables");
+ List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(1, f.size());
- Assertions.assertEquals("supplier",
f.stream().map(Table::getName).findFirst().get());
+ Assertions.assertEquals("supplier",
f.stream().map(TableIf::getName).findFirst().get());
}
@Test
@@ -84,10 +85,9 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
- List<Table> f = (List<Table>)
Deencapsulation.getField(cascadesContext, "tables");
-
+ List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(1, f.size());
- Assertions.assertEquals("supplier",
f.stream().map(Table::getName).findFirst().get());
+ Assertions.assertEquals("supplier",
f.stream().map(TableIf::getName).findFirst().get());
}
@Test
@@ -100,11 +100,28 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
- List<Table> f = (List<Table>)
Deencapsulation.getField(cascadesContext, "tables");
+ List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(2, f.size());
- Set<String> tableNames =
f.stream().map(Table::getName).collect(Collectors.toSet());
+ Set<String> tableNames =
f.stream().map(TableIf::getName).collect(Collectors.toSet());
Assertions.assertTrue(tableNames.contains("supplier"));
Assertions.assertTrue(tableNames.contains("lineorder"));
+ }
+ @Test
+ public void testInserInto() {
+ String sql = "INSERT INTO supplier(s_suppkey) SELECT lo_orderkey FROM
lineorder";
+ StatementContext statementContext =
MemoTestUtils.createStatementContext(connectContext, sql);
+ InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) parser.parseSingle(sql);
+ NereidsPlanner planner = new NereidsPlanner(statementContext);
+ planner.plan(
+ (LogicalPlan)
insertIntoTableCommand.getExplainPlan(connectContext),
+ PhysicalProperties.ANY
+ );
+ CascadesContext cascadesContext = planner.getCascadesContext();
+ List<TableIf> f = cascadesContext.getTables();
+ Assertions.assertEquals(2, f.size());
+ Set<String> tableNames =
f.stream().map(TableIf::getName).collect(Collectors.toSet());
+ Assertions.assertTrue(tableNames.contains("supplier"));
+ Assertions.assertTrue(tableNames.contains("lineorder"));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]