This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d151718 [MaterializedView] Fix bug that preAggregation is different
between old and new selector (#3018)
d151718 is described below
commit d151718e98fc51f9d9a4f5fa9f05470f3c135bd2
Author: EmmyMiao87 <[email protected]>
AuthorDate: Mon Mar 2 19:11:10 2020 +0800
[MaterializedView] Fix bug that preAggregation is different between old and
new selector (#3018)
If there is no aggregated column in aggregate index, the index will be
deduplicate table.
For example:
aggregate table (k1, k2, v1 sum)
mv index (k1, k2)
This kind of index is SPJG which same as `select k1, k2 from
aggregate_table group by k1, k2`.
It also need to check the grouping column using following steps.
If there is no aggregated column in duplicate index, the index will be SPJ
which passes the grouping verification directly.
Also after the supplement of index, the new candidate index should be
checked the output columns also.
---
.../java/org/apache/doris/alter/AlterHandler.java | 3 +-
.../java/org/apache/doris/catalog/OlapTable.java | 8 +++-
.../main/java/org/apache/doris/common/Config.java | 8 ++--
.../java/org/apache/doris/common/FeConstants.java | 3 ++
.../load/routineload/RoutineLoadScheduler.java | 4 +-
.../doris/planner/MaterializedViewSelector.java | 43 ++++++++++++++++------
.../org/apache/doris/planner/OlapScanNode.java | 22 ++++++++++-
.../org/apache/doris/planner/RollupSelector.java | 17 +++------
.../apache/doris/planner/SingleNodePlanner.java | 4 ++
.../planner/MaterializedViewFunctionTest.java | 20 +++++++++-
.../planner/MaterializedViewSelectorTest.java | 33 +++++++++++++----
.../java/org/apache/doris/utframe/DorisAssert.java | 28 +++++++++++---
12 files changed, 145 insertions(+), 48 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
index 9e630c2..4a47d68 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
@@ -83,7 +84,7 @@ public abstract class AlterHandler extends MasterDaemon {
}
public AlterHandler(String name) {
- super(name, 10000);
+ super(name, FeConstants.default_scheduler_interval_millisecond);
}
protected void addAlterJobV2(AlterJobV2 alterJob) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index 3337ffd..5a4785f 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -345,14 +345,18 @@ public class OlapTable extends Table {
public Map<Long, List<Column>> getVisibleIndexIdToSchema() {
Map<Long, List<Column>> visibleMVs = Maps.newHashMap();
- Partition partition =
idToPartition.values().stream().findFirst().get();
- List<MaterializedIndex> mvs =
partition.getMaterializedIndices(IndexExtState.VISIBLE);
+ List<MaterializedIndex> mvs = getVisibleIndex();
for (MaterializedIndex mv : mvs) {
visibleMVs.put(mv.getId(), indexIdToSchema.get(mv.getId()));
}
return visibleMVs;
}
+ public List<MaterializedIndex> getVisibleIndex() {
+ Partition partition =
idToPartition.values().stream().findFirst().get();
+ return partition.getMaterializedIndices(IndexExtState.VISIBLE);
+ }
+
// this is only for schema change.
public void renameIndexForSchemaChange(String name, String newName) {
long idxId = indexNameToId.remove(name);
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java
b/fe/src/main/java/org/apache/doris/common/Config.java
index ee335f3..6018b4c 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -361,9 +361,11 @@ public class Config extends ConfigBase {
public static int max_layout_length_per_row = 100000; // 100k
/*
- * Load checker's running interval.
- * A load job will transfer its state from PENDING to ETL to LOADING to
FINISHED.
- * So a load job will cost at least 3 check intervals to finish.
+ * The load scheduler running interval.
+ * A load job will transfer its state from PENDING to LOADING to FINISHED.
+ * The load scheduler will transfer load job from PENDING to LOADING
+ * while the txn callback will transfer load job from LOADING to
FINISHED.
+ * So a load job will cost at most one interval to finish when the
concurrency has not reached the upper limit.
*/
@ConfField public static int load_checker_interval_second = 5;
diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/src/main/java/org/apache/doris/common/FeConstants.java
index 93e3ba4..e4fef6f 100644
--- a/fe/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java
@@ -44,6 +44,9 @@ public class FeConstants {
// set to true to skip some step when running FE unit test
public static boolean runningUnitTest = false;
+ // default scheduler interval is 10 seconds
+ public static int default_scheduler_interval_millisecond = 10000;
+
// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_CURRENT;
diff --git
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 9f26f9e..35057be 100644
---
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -18,6 +18,7 @@
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@@ -36,7 +37,6 @@ import java.util.List;
public class RoutineLoadScheduler extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(RoutineLoadScheduler.class);
- private static final int DEFAULT_INTERVAL_SECONDS = 10;
private RoutineLoadManager routineLoadManager;
@@ -47,7 +47,7 @@ public class RoutineLoadScheduler extends MasterDaemon {
}
public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) {
- super("Routine load scheduler", DEFAULT_INTERVAL_SECONDS * 1000);
+ super("Routine load scheduler",
FeConstants.default_scheduler_interval_millisecond);
this.routineLoadManager = routineLoadManager;
}
diff --git
a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
index f8624e1..8572ea4 100644
--- a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
@@ -122,7 +122,7 @@ public class MaterializedViewSelector {
// Step2: check all columns in compensating predicates are available
in the view output
checkCompensatingPredicates(columnNamesInPredicates.get(tableName),
candidateIndexIdToSchema);
// Step3: group by list in query is the subset of group by list in
view or view contains no aggregation
- checkGrouping(columnNamesInGrouping.get(tableName),
candidateIndexIdToSchema);
+ checkGrouping(columnNamesInGrouping.get(tableName),
candidateIndexIdToSchema, table.getKeysType());
// Step4: aggregation functions are available in the view output
checkAggregationFunction(aggregateColumnsInQuery.get(tableName),
candidateIndexIdToSchema);
// Step5: columns required to compute output expr are available in the
view output
@@ -144,8 +144,9 @@ public class MaterializedViewSelector {
* So, we need to compensate those kinds of index in following
step.
*
*/
- compensateIndex(candidateIndexIdToSchema,
scanNode.getOlapTable().getVisibleIndexIdToSchema(),
-
table.getSchemaByIndexId(table.getBaseIndexId()).size());
+ compensateCandidateIndex(candidateIndexIdToSchema,
scanNode.getOlapTable().getVisibleIndexIdToSchema(),
+ table);
+ checkOutputColumns(columnNamesInQueryOutput.get(tableName),
candidateIndexIdToSchema);
}
return candidateIndexIdToSchema;
}
@@ -279,7 +280,8 @@ public class MaterializedViewSelector {
* @param candidateIndexIdToSchema
*/
- private void checkGrouping(Set<String> columnsInGrouping, Map<Long,
List<Column>> candidateIndexIdToSchema) {
+ private void checkGrouping(Set<String> columnsInGrouping, Map<Long,
List<Column>> candidateIndexIdToSchema,
+ KeysType keysType) {
Iterator<Map.Entry<Long, List<Column>>> iterator =
candidateIndexIdToSchema.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, List<Column>> entry = iterator.next();
@@ -287,8 +289,23 @@ public class MaterializedViewSelector {
List<Column> candidateIndexSchema = entry.getValue();
candidateIndexSchema.stream().filter(column ->
!column.isAggregated())
.forEach(column ->
indexNonAggregatedColumnNames.add(column.getName()));
- // When the candidate index is SPJ type, it passes the
verification directly
- if (indexNonAggregatedColumnNames.size() ==
candidateIndexSchema.size()) {
+ /*
+ If there is no aggregated column in duplicate table, the index
will be SPJ.
+ For example:
+ duplicate table (k1, k2, v1)
+ mv index (k1, v1)
+ When the candidate index is SPJ type, it passes the verification
directly
+
+ If there is no aggregated column in aggregate index, the index
will be deduplicate table.
+ For example:
+ aggregate table (k1, k2, v1 sum)
+ mv index (k1, k2)
+ This kind of index is SPJG which same as select k1, k2 from
aggregate_table group by k1, k2.
+ It also need to check the grouping column using following steps.
+
+ ISSUE-3016, MaterializedViewFunctionTest: testDeduplicateQueryInAgg
+ */
+ if (indexNonAggregatedColumnNames.size() ==
candidateIndexSchema.size() && keysType == KeysType.DUP_KEYS) {
continue;
}
// When the query is SPJ type but the candidate index is SPJG
type, it will not pass directly.
@@ -366,14 +383,16 @@ public class MaterializedViewSelector {
+
Joiner.on(",").join(candidateIndexIdToSchema.keySet()));
}
- private void compensateIndex(Map<Long, List<Column>>
candidateIndexIdToSchema,
+ private void compensateCandidateIndex(Map<Long, List<Column>>
candidateIndexIdToSchema,
Map<Long, List<Column>> allVisibleIndexes,
- int sizeOfBaseIndexSchema) {
+ OlapTable table) {
isPreAggregation = false;
reasonOfDisable = "The aggregate operator does not match";
+ int keySizeOfBaseIndex =
table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
for (Map.Entry<Long, List<Column>> index :
allVisibleIndexes.entrySet()) {
- if (index.getValue().size() == sizeOfBaseIndexSchema) {
- candidateIndexIdToSchema.put(index.getKey(), index.getValue());
+ long mvIndexId = index.getKey();
+ if (table.getKeyColumnsByIndexId(mvIndexId).size() ==
keySizeOfBaseIndex) {
+ candidateIndexIdToSchema.put(mvIndexId, index.getValue());
}
}
LOG.debug("Those mv pass the test of output columns:"
@@ -415,7 +434,6 @@ public class MaterializedViewSelector {
Expr aggChild0 = aggExpr.getChild(0);
if (aggChild0 instanceof SlotRef) {
SlotRef slotRef = (SlotRef) aggChild0;
- Preconditions.checkState(slotRef.getColumnName() != null);
Table table = slotRef.getDesc().getParent().getTable();
/* If this column come from subquery, the parent table
will be null
* For example: select k1 from (select name as k1 from
tableA) A
@@ -424,11 +442,11 @@ public class MaterializedViewSelector {
if (table == null) {
continue;
}
+ Preconditions.checkState(slotRef.getColumnName() != null);
addAggregatedColumn(slotRef.getColumnName(),
aggExpr.getFnName().getFunction(),
table.getName());
} else if ((aggChild0 instanceof CastExpr) &&
(aggChild0.getChild(0) instanceof SlotRef)) {
SlotRef slotRef = (SlotRef) aggChild0.getChild(0);
- Preconditions.checkState(slotRef.getColumnName() != null);
Table table = slotRef.getDesc().getParent().getTable();
/*
* Same as above
@@ -436,6 +454,7 @@ public class MaterializedViewSelector {
if (table == null) {
continue;
}
+ Preconditions.checkState(slotRef.getColumnName() != null);
addAggregatedColumn(slotRef.getColumnName(),
aggExpr.getFnName().getFunction(),
table.getName());
} else {
diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 1c5dfab..83849bd 100644
--- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -43,6 +43,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -85,6 +86,23 @@ public class OlapScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(OlapScanNode.class);
private List<TScanRangeLocations> result = new ArrayList<>();
+ /*
+ * When the field value is ON, the storage engine can return the data
directly without pre-aggregation.
+ * When the field value is OFF, the storage engine needs to aggregate the
data before returning to scan node.
+ * For example:
+ * Aggregate table: k1, k2, v1 sum
+ * Field value is ON
+ * Query1: select k1, sum(v1) from table group by k1
+ * This aggregation function in query is same as the schema.
+ * So the field value is ON while the query can scan data directly.
+ *
+ * Field value is OFF
+ * Query1: select k1 , k2 from table
+ * This aggregation info is null.
+ * Query2: select k1, min(v1) from table group by k1
+ * This aggregation function in query is min which different from the
schema.
+ * So the data stored in storage engine need to be merged firstly before
returning to scan node.
+ */
private boolean isPreAggregation = false;
private String reasonOfPreAggregation = null;
private boolean canTurnOnPreAggr = true;
@@ -416,7 +434,9 @@ public class OlapScanNode extends ScanNode {
selectedPartitionNum = selectedPartitionIds.size();
LOG.debug("partition prune cost: {} ms, partitions: {}",
(System.currentTimeMillis() - start), selectedPartitionIds);
- if (selectedPartitionIds.size() == 0) {
+ // The fe unit test need to check the selected index id without any
data.
+ // So the step2 needs to be continued although selectedPartitionIds is
0.
+ if (selectedPartitionIds.size() == 0 && !FeConstants.runningUnitTest) {
return;
}
diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
index d303c29..fc76a86 100644
--- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
@@ -28,9 +28,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
@@ -65,14 +63,10 @@ public final class RollupSelector {
public long selectBestRollup(
Collection<Long> partitionIds, List<Expr> conjuncts, boolean
isPreAggregation)
throws UserException {
- Preconditions.checkArgument(partitionIds != null &&
!partitionIds.isEmpty(),
- "Paritition can't be null or empty.");
+ Preconditions.checkArgument(partitionIds != null , "Paritition can't
be null.");
// Get first partition to select best prefix index rollups, because
MaterializedIndex ids in one rollup's partitions are all same.
final List<Long> bestPrefixIndexRollups =
- selectBestPrefixIndexRollup(
- table.getPartition(partitionIds.iterator().next()),
- conjuncts,
- isPreAggregation);
+ selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds);
}
@@ -121,8 +115,7 @@ public final class RollupSelector {
return selectedIndexId;
}
- private List<Long> selectBestPrefixIndexRollup(
- Partition partition, List<Expr> conjuncts, boolean
isPreAggregation) throws UserException {
+ private List<Long> selectBestPrefixIndexRollup(List<Expr> conjuncts,
boolean isPreAggregation) throws UserException {
final List<String> outputColumns = Lists.newArrayList();
for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
@@ -130,12 +123,12 @@ public final class RollupSelector {
outputColumns.add(col.getName());
}
- final List<MaterializedIndex> rollups =
partition.getMaterializedIndices(IndexExtState.VISIBLE);
+ final List<MaterializedIndex> rollups = table.getVisibleIndex();
LOG.debug("num of rollup(base included): {}, pre aggr: {}",
rollups.size(), isPreAggregation);
// 1. find all rollup indexes which contains all tuple columns
final List<MaterializedIndex> rollupsContainsOutput =
Lists.newArrayList();
- final List<Column> baseTableColumns =
table.getKeyColumnsByIndexId(partition.getBaseIndex().getId());
+ final List<Column> baseTableColumns =
table.getKeyColumnsByIndexId(table.getBaseIndexId());
for (MaterializedIndex rollup : rollups) {
final Set<String> rollupColumns = Sets.newHashSet();
table.getSchemaByIndexId(rollup.getId())
diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 5b7b012..ca88171 100644
--- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -54,6 +54,7 @@ import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
@@ -769,6 +770,9 @@ public class SingleNodePlanner {
continue;
}
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
+ if (olapScanNode.getSelectedPartitionIds().size() == 0 &&
!FeConstants.runningUnitTest) {
+ continue;
+ }
MaterializedViewSelector.BestIndexInfo bestIndexInfo =
materializedViewSelector.selectBestMV
(olapScanNode);
olapScanNode.updateScanRangeInfoByNewMVSelector(bestIndexInfo.getBestIndexId(),
bestIndexInfo.isPreAggregation(),
diff --git
a/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
b/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
index 7bee2a8..80e6ad4 100644
---
a/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
+++
b/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.planner;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;
@@ -48,6 +49,8 @@ public class MaterializedViewFunctionTest {
@BeforeClass
public static void beforeClass() throws Exception {
+ FeConstants.default_scheduler_interval_millisecond = 10;
+ FeConstants.runningUnitTest = true;
UtFrameUtils.createMinDorisCluster(runningDir);
dorisAssert = new DorisAssert();
dorisAssert.withEnableMV().withDatabase(HR_DB_NAME).useDatabase(HR_DB_NAME);
@@ -56,11 +59,13 @@ public class MaterializedViewFunctionTest {
@Before
public void beforeMethod() throws Exception {
String createTableSQL = "create table " + HR_DB_NAME + "." +
EMPS_TABLE_NAME + " (empid int, name varchar, "
- + "deptno int, salary int, commission int) "
+ + "deptno int, salary int, commission int) partition by range
(empid) "
+ + "(partition p1 values less than MAXVALUE) "
+ "distributed by hash(empid) buckets 3
properties('replication_num' = '1');";
dorisAssert.withTable(createTableSQL);
createTableSQL = "create table " + HR_DB_NAME + "." + DEPTS_TABLE_NAME
- + " (deptno int, name varchar, cost int) "
+ + " (deptno int, name varchar, cost int) partition by range
(deptno) "
+ + "(partition p1 values less than MAXVALUE) "
+ "distributed by hash(deptno) buckets 3
properties('replication_num' = '1');";
dorisAssert.withTable(createTableSQL);
}
@@ -568,4 +573,15 @@ public class MaterializedViewFunctionTest {
" deptno from " + EMPS_TABLE_NAME + " where empid <0;";
dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV,
2);
}
+
+ @Test
+ public void testDeduplicateQueryInAgg() throws Exception {
+ String aggregateTable = "create table agg_table (k1 int, k2 int, v1
bigint sum) aggregate key (k1, k2) "
+ + "distributed by hash(k1) buckets 3
properties('replication_num' = '1');";
+ dorisAssert.withTable(aggregateTable);
+ String createRollupSQL = "alter table agg_table add rollup old_key
(k1, k2) "
+ + "properties ('replication_num' = '1');";
+ String query = "select k1, k2 from agg_table;";
+
dorisAssert.withRollup(createRollupSQL).query(query).explainContains("OFF",
"old_key");
+ }
}
diff --git
a/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
b/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
index 12aa5ae..25d3548 100644
---
a/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
+++
b/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
@@ -27,6 +27,8 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.jmockit.Deencapsulation;
@@ -35,6 +37,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.builder.ToStringExclude;
import org.junit.Assert;
import org.junit.Test;
@@ -231,7 +234,8 @@ public class MaterializedViewSelectorTest {
MaterializedViewSelector selector = new
MaterializedViewSelector(selectStmt, analyzer);
Deencapsulation.setField(selector, "isSPJQuery", false);
- Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames,
candidateIndexIdToSchema);
+ Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames,
candidateIndexIdToSchema,
+ KeysType.DUP_KEYS);
Assert.assertEquals(2, candidateIndexIdToSchema.size());
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(1)));
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(2)));
@@ -318,37 +322,50 @@ public class MaterializedViewSelectorTest {
}
@Test
- public void testCompensateIndex(@Injectable SelectStmt selectStmt,
@Injectable Analyzer analyzer) {
+ public void testCompensateIndex(@Injectable SelectStmt selectStmt,
@Injectable Analyzer analyzer,
+ @Injectable OlapTable table) {
Map<Long, List<Column>> candidateIndexIdToSchema = Maps.newHashMap();
Map<Long, List<Column>> allVisibleIndexes = Maps.newHashMap();
List<Column> index1Columns = Lists.newArrayList();
- Column index1Column1 = new Column("c2", Type.INT, true, null, true,
"", "");
+ Column index1Column1 = new Column("c2", Type.INT, true,
AggregateType.SUM, true, "", "");
index1Columns.add(index1Column1);
allVisibleIndexes.put(new Long(1), index1Columns);
List<Column> index2Columns = Lists.newArrayList();
Column index2Column1 = new Column("c1", Type.INT, true, null, true,
"", "");
index2Columns.add(index2Column1);
- Column index2Column2 = new Column("c2", Type.INT, false,
AggregateType.NONE, true, "", "");
+ Column index2Column2 = new Column("c2", Type.INT, false,
AggregateType.SUM, true, "", "");
index2Columns.add(index2Column2);
allVisibleIndexes.put(new Long(2), index2Columns);
List<Column> index3Columns = Lists.newArrayList();
- Column index3Column1 = new Column("C2", Type.INT, true, null, true,
"", "");
+ Column index3Column1 = new Column("c1", Type.INT, true, null, true,
"", "");
index3Columns.add(index3Column1);
- Column index3Column2 = new Column("c1", Type.INT, false,
AggregateType.SUM, true, "", "");
+ Column index3Column2 = new Column("c3", Type.INT, false,
AggregateType.SUM, true, "", "");
index3Columns.add(index3Column2);
allVisibleIndexes.put(new Long(3), index3Columns);
+ List<Column> keyColumns = Lists.newArrayList();
+ keyColumns.add(index2Column1);
new Expectations() {
{
selectStmt.getAggInfo();
result = null;
selectStmt.getResultExprs();
result = Lists.newArrayList();
+ table.getBaseIndexId();
+ result = -1L;
+ table.getKeyColumnsByIndexId(-1L);
+ result = keyColumns;
+ table.getKeyColumnsByIndexId(1L);
+ result = Lists.newArrayList();
+ table.getKeyColumnsByIndexId(2L);
+ result = keyColumns;
+ table.getKeyColumnsByIndexId(3L);
+ result = keyColumns;
}
};
MaterializedViewSelector selector = new
MaterializedViewSelector(selectStmt, analyzer);
- Deencapsulation.invoke(selector, "compensateIndex",
candidateIndexIdToSchema,
- allVisibleIndexes, 2);
+ Deencapsulation.invoke(selector, "compensateCandidateIndex",
candidateIndexIdToSchema,
+ allVisibleIndexes, table);
Assert.assertEquals(2, candidateIndexIdToSchema.size());
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(2)));
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(3)));
diff --git a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
index 02de02a..048599c 100644
--- a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
+++ b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
@@ -18,10 +18,13 @@
package org.apache.doris.utframe;
import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.analysis.AlterClause;
+import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
@@ -82,19 +85,34 @@ public class DorisAssert {
CreateMaterializedViewStmt createMaterializedViewStmt =
(CreateMaterializedViewStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
Catalog.getCurrentCatalog().createMaterializedView(createMaterializedViewStmt);
+ checkAlterJob();
+ // waiting table state to normal
+ Thread.sleep(100);
+ return this;
+ }
+
+ // Add rollup
+ public DorisAssert withRollup(String sql) throws Exception {
+ AlterTableStmt alterTableStmt = (AlterTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
+ Catalog.getCurrentCatalog().alterTable(alterTableStmt);
+ checkAlterJob();
+ // waiting table state to normal
+ Thread.sleep(100);
+ return this;
+ }
+
+ private void checkAlterJob() throws InterruptedException {
// check alter job
Map<Long, AlterJobV2> alterJobs =
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
- System.out.println("alter job " + alterJobV2.getDbId() + " is
running. state: " + alterJobV2.getJobState());
- Thread.sleep(5000);
+ System.out.println("alter job " + alterJobV2.getDbId()
+ + " is running. state: " + alterJobV2.getJobState());
+ Thread.sleep(100);
}
System.out.println("alter job " + alterJobV2.getDbId() + " is
done. state: " + alterJobV2.getJobState());
Assert.assertEquals(AlterJobV2.JobState.FINISHED,
alterJobV2.getJobState());
}
- // waiting table state to normal
- Thread.sleep(5000);
- return this;
}
public QueryAssert query(String sql) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]