This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 87b9425772 [Bug](materialized-view) fix where clause not analyzed
after fe restart (#22268)
87b9425772 is described below
commit 87b94257727056b890fc98689e0c2b6e14169f13
Author: Pxl <[email protected]>
AuthorDate: Thu Jul 27 18:34:44 2023 +0800
[Bug](materialized-view) fix where clause not analyzed after fe restart
(#22268)
fix where clause not analyzed after fe restart
---
.../java/org/apache/doris/analysis/Analyzer.java | 15 +++---
.../apache/doris/analysis/NativeInsertStmt.java | 2 +-
.../doris/load/loadv2/LoadingTaskPlanner.java | 2 +-
.../plans/commands/InsertIntoTableCommand.java | 3 +-
.../org/apache/doris/planner/OlapTableSink.java | 12 +++--
.../apache/doris/planner/StreamLoadPlanner.java | 4 +-
.../apache/doris/planner/OlapTableSinkTest.java | 8 +--
.../test_partial_update_schema_change.groovy | 61 +++++++++++++++++-----
8 files changed, 76 insertions(+), 31 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 1e44278eef..761aae1442 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -146,9 +146,6 @@ public class Analyzer {
private String schemaWild;
private String schemaTable; // table used in DESCRIBE Table
- // True if the corresponding select block has a limit and/or offset clause.
- private boolean hasLimitOffsetClause = false;
-
// Current depth of nested analyze() calls. Used for enforcing a
// maximum expr-tree depth. Needs to be manually maintained by the user
// of this Analyzer with incrementCallDepth() and decrementCallDepth().
@@ -657,6 +654,14 @@ public class Analyzer {
}
}
+ public void registerTupleDescriptor(TupleDescriptor desc) {
+ tupleByAlias.put(desc.getAlias(), desc);
+ for (SlotDescriptor slot : desc.getSlots()) {
+ String key = desc.getAlias() + "." + slot.getColumn().getName();
+ slotRefMap.put(key, slot);
+ }
+ }
+
/**
* Creates an returns an empty TupleDescriptor for the given table ref and
registers
* it against all its legal aliases. For tables refs with an explicit
alias, only the
@@ -1818,10 +1823,6 @@ public class Analyzer {
return hasEmptySpjResultSet;
}
- public void setHasLimitOffsetClause(boolean hasLimitOffset) {
- this.hasLimitOffsetClause = hasLimitOffset;
- }
-
/**
* Register all conjuncts in 'conjuncts' that make up the On-clause of the
given
* right-hand side of a join. Assigns each conjunct a unique id. If rhsRef
is
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 02b6e40f14..adf12e0ddf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -849,7 +849,7 @@ public class NativeInsertStmt extends InsertStmt {
public void complete() throws UserException {
if (!isExplain() && targetTable instanceof OlapTable) {
- ((OlapTableSink) dataSink).complete();
+ ((OlapTableSink) dataSink).complete(analyzer);
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), transactionId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 982eb813e6..fa86ab06b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -171,7 +171,7 @@ public class LoadingTaskPlanner {
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc,
partitionIds,
Config.enable_single_replica_load);
olapTableSink.init(loadId, txnId, dbId, timeoutS,
sendBatchParallelism, false, strictMode);
- olapTableSink.complete();
+ olapTableSink.complete(analyzer);
// 3. Plan fragment
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0),
scanNode, DataPartition.RANDOM);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index ab373c7886..3155eab2e7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands;
+import org.apache.doris.analysis.Analyzer;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager.ProfileType;
@@ -123,7 +124,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
ctx.getExecTimeout(),
ctx.getSessionVariable().getSendBatchParallelism(), false,
false);
- sink.complete();
+ sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state =
Env.getCurrentGlobalTransactionMgr().getTransactionState(
physicalOlapTableSink.getDatabase().getId(),
txn.getTxnId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 0bc30f978a..b09ebcff7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -17,6 +17,7 @@
package org.apache.doris.planner;
+import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
@@ -164,7 +165,7 @@ public class OlapTableSink extends DataSink {
}
// must called after tupleDescriptor is computed
- public void complete() throws UserException {
+ public void complete(Analyzer analyzer) throws UserException {
TOlapTableSink tSink = tDataSink.getOlapTableSink();
tSink.setTableId(dstTable.getId());
@@ -176,7 +177,7 @@ public class OlapTableSink extends DataSink {
}
tSink.setNumReplicas(numReplicas);
tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
- tSink.setSchema(createSchema(tSink.getDbId(), dstTable));
+ tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
tSink.setPartition(createPartition(tSink.getDbId(), dstTable));
List<TOlapTableLocationParam> locationParams =
createLocation(dstTable);
tSink.setLocation(locationParams.get(0));
@@ -214,7 +215,7 @@ public class OlapTableSink extends DataSink {
return tDataSink;
}
- private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) {
+ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table,
Analyzer analyzer) throws AnalysisException {
TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam();
schemaParam.setDbId(dbId);
schemaParam.setTableId(table.getId());
@@ -253,6 +254,11 @@ public class OlapTableSink extends DataSink {
if (indexMeta.getWhereClause() != null) {
Expr expr = indexMeta.getWhereClause().clone();
expr.replaceSlot(tupleDescriptor);
+ if (analyzer != null) {
+ tupleDescriptor.setTable(table);
+ analyzer.registerTupleDescriptor(tupleDescriptor);
+ expr.analyze(analyzer);
+ }
indexSchema.setWhereClause(expr.treeToThrift());
}
indexSchema.setColumnsDesc(columnsDesc);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 40d5b99f34..ac5d6c595c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -266,7 +266,7 @@ public class StreamLoadPlanner {
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateInputColumns);
- olapTableSink.complete();
+ olapTableSink.complete(analyzer);
// for stream load, we only need one fragment, ScanNode -> DataSink.
// OlapTableSink can dispatch data to corresponding node.
@@ -476,7 +476,7 @@ public class StreamLoadPlanner {
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateInputColumns);
- olapTableSink.complete();
+ olapTableSink.complete(analyzer);
// for stream load, we only need one fragment, ScanNode -> DataSink.
// OlapTableSink can dispatch data to corresponding node.
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
index c6da7304d5..dc98026a00 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
@@ -108,7 +108,7 @@ public class OlapTableSinkTest {
dstTable.getPartitionInfo().setIsMutable(partition.getId(), true);
OlapTableSink sink = new OlapTableSink(dstTable, tuple,
Lists.newArrayList(2L), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
- sink.complete();
+ sink.complete(null);
LOG.info("sink is {}", sink.toThrift());
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
}
@@ -146,7 +146,7 @@ public class OlapTableSinkTest {
OlapTableSink sink = new OlapTableSink(dstTable, tuple,
Lists.newArrayList(p1.getId()), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
try {
- sink.complete();
+ sink.complete(null);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
@@ -170,7 +170,7 @@ public class OlapTableSinkTest {
OlapTableSink sink = new OlapTableSink(dstTable, tuple,
Lists.newArrayList(unknownPartId), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
- sink.complete();
+ sink.complete(null);
LOG.info("sink is {}", sink.toThrift());
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
}
@@ -208,7 +208,7 @@ public class OlapTableSinkTest {
OlapTableSink sink = new OlapTableSink(dstTable, tuple,
Lists.newArrayList(p1.getId()), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
try {
- sink.complete();
+ sink.complete(null);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
index 905c622159..9883f19dcd 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
@@ -64,12 +64,15 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' "
+ def try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data without new column
@@ -174,12 +177,15 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} DROP COLUMN c8 "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data without delete column
@@ -283,12 +289,15 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} MODIFY COLUMN c2 double "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data with update column
@@ -358,20 +367,27 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} ADD COLUMN c1 int key null "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
+
sql " ALTER table ${tableName} ADD COLUMN c2 int null "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data with all key column
@@ -451,12 +467,15 @@ suite("test_partial_update_schema_change", "p0") {
qt_sql10 " select * from ${tableName} order by c0 "
sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
//test load data with create index
@@ -605,12 +624,15 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data without new column
@@ -714,12 +736,15 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} DROP COLUMN c8 "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data without delete column
@@ -822,12 +847,15 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} MODIFY COLUMN c2 double "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data with update column
@@ -896,20 +924,26 @@ suite("test_partial_update_schema_change", "p0") {
// schema change
sql " ALTER table ${tableName} ADD COLUMN c1 int key null "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
sql " ALTER table ${tableName} ADD COLUMN c2 int null "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
// test load data with all key column
@@ -987,12 +1021,15 @@ suite("test_partial_update_schema_change", "p0") {
qt_sql23 " select * from ${tableName} order by c0 "
sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP "
+ try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(1000)
if(res[0][9].toString() == "FINISHED"){
break;
}
- Thread.sleep(500)
+ assert(try_times>0)
+ try_times--
}
//test load data with create index
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]