amansinha100 commented on code in PR #5096: URL: https://github.com/apache/hive/pull/5096#discussion_r1510026927
########## ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java: ########## @@ -353,29 +353,32 @@ private RelNode applyRecordIncrementalRebuildPlan( MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor(acidView); visitor.go(basePlan); - if (visitor.isRewritingAllowed()) { - if (!materialization.isSourceTablesUpdateDeleteModified()) { - // Trigger rewriting to remove UNION branch with MV + switch (visitor.getIncrementalRebuildMode()) { + case INSERT_ONLY: + if (materialization.isSourceTablesUpdateDeleteModified()) { + return calcitePreMVRewritingPlan; + } + if (visitor.isContainsAggregate()) { return applyAggregateInsertIncremental(basePlan, mdProvider, executorProvider, optCluster, calcitePreMVRewritingPlan); } else { return applyJoinInsertIncremental(basePlan, mdProvider, executorProvider); } - } else { - // count(*) is necessary for determine which rows should be deleted from the view - // if view definition does not have it incremental rebuild can not be performed - if (acidView && visitor.isContainsAggregate() && visitor.getCountIndex() >= 0) { - return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, executorProvider); + case AVAILABLE: + if (!materialization.isSourceTablesUpdateDeleteModified()) { + return applyAggregateInsertIncremental(basePlan, mdProvider, executorProvider, optCluster, calcitePreMVRewritingPlan); } else { + return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, executorProvider); + } + case NOT_AVAILABLE: + default: + if (materialization.isSourceTablesUpdateDeleteModified()) { + // calcitePreMVRewritingPlan is already got the optimizations by applyPreJoinOrderingTransforms prior calling Review Comment: nit: "prior to calling" ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> + * <li>Whether the plane has count(*) aggregate function call</li> Review Comment: nit: 'plan' ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewRewritingRelVisitor.java: ########## @@ -45,18 +39,15 @@ */ public class MaterializedViewRewritingRelVisitor extends RelVisitor { - private static final Logger LOG = LoggerFactory.getLogger(MaterializedViewRewritingRelVisitor.class); Review Comment: It would be super useful to have DEBUG level logging for the cases where the incremental rebuild was not feasible. The log message could indicate the exact reason why this was not applied, since some of the reasons are not obvious. ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> Review Comment: nit: 'plan' instead of 'plane' ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> + * <li>Whether the plane has count(*) aggregate function call</li> + * <li>Check whether aggregate functions are supported</li> + * </ul> */ -public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor { +public class MaterializedViewIncrementalRewritingRelVisitor implements ReflectiveVisitor { - private boolean containsAggregate; - private boolean hasAllowedOperatorsOnly; - private boolean hasCountStar; - private boolean insertAllowedOnly; + private final ReflectUtil.MethodDispatcher<Result> dispatcher; public MaterializedViewIncrementalRewritingRelVisitor() { - this.containsAggregate = false; - this.hasAllowedOperatorsOnly = true; - this.hasCountStar = false; - this.insertAllowedOnly = false; + this.dispatcher = ReflectUtil.createMethodDispatcher( + Result.class, this, "visit", RelNode.class); + } + + /** + * Starts an iteration. + */ + public Result go(RelNode relNode) { + Result result = dispatcher.invoke(relNode); + if (result.containsAggregate) { + return result; + } + + if (result.incrementalRebuildMode == AVAILABLE) { + // Incremental rebuild of non-aggregate MV is not supported when any source table has delete operations. + return new Result(INSERT_ONLY); + } + + return result; + } + + public Result visit(RelNode relNode) { + // Only TS, Filter, Join, Project and Aggregate are supported + return new Result(NOT_AVAILABLE); + } + + private Result visitChildOf(RelNode rel) { + return visitChildOf(rel, 0); } - @Override - public void visit(RelNode node, int ordinal, RelNode parent) { - if (node instanceof Aggregate) { - this.containsAggregate = true; - check((Aggregate) node); - super.visit(node, ordinal, parent); - } else if ( - node instanceof Filter || - node instanceof Project || - node instanceof Join) { - super.visit(node, ordinal, parent); - } else if (node instanceof TableScan) { - HiveTableScan scan = (HiveTableScan) node; - RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); - if (hiveTable.getHiveTableMD().getStorageHandler() != null && - hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { + private Result visitChildOf(RelNode rel, int index) { + return dispatcher.invoke(rel.getInput(index)); + } + + public Result visit(HiveTableScan scan) { + RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); + + if (hiveTable.getHiveTableMD().getStorageHandler() != null) { + if (hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { // Incremental rebuild of materialized views with non-native source tables are not implemented // when any of the source tables has delete/update operation since the last rebuild - insertAllowedOnly = true; + return new Result(INSERT_ONLY); + } else { + return new Result(NOT_AVAILABLE); } - } else { - hasAllowedOperatorsOnly = false; } + + return new Result(AVAILABLE); + } + + public Result visit(HiveProject project) { + return visitChildOf(project); + } + + public Result visit(HiveFilter filter) { + return visitChildOf(filter); } - private void check(Aggregate aggregate) { + public Result visit(HiveJoin join) { + if (join.getJoinType() != JoinRelType.INNER) { + return new Result(NOT_AVAILABLE); + } + + Result leftResult = visitChildOf(join, 0); + Result rightResult = visitChildOf(join, 1); + + boolean containsAggregate = leftResult.containsAggregate || rightResult.containsAggregate; + int countStarIndex = leftResult.countIndex; + if (countStarIndex == -1 && rightResult.countIndex != -1) { + countStarIndex = rightResult.countIndex + join.getLeft().getRowType().getFieldCount(); + } + switch (rightResult.incrementalRebuildMode) { + case INSERT_ONLY: + return new Result(INSERT_ONLY, containsAggregate, countStarIndex); + case AVAILABLE: + return new Result( + leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : AVAILABLE, + containsAggregate, + countStarIndex); + case NOT_AVAILABLE: + default: Review Comment: Since there should only be 3 values possible, can we assert with an error message for the default case so we catch any hidden bugs ? We should not fall through to the default. ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> + * <li>Whether the plane has count(*) aggregate function call</li> + * <li>Check whether aggregate functions are supported</li> + * </ul> */ -public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor { +public class MaterializedViewIncrementalRewritingRelVisitor implements ReflectiveVisitor { - private boolean containsAggregate; - private boolean hasAllowedOperatorsOnly; - private boolean hasCountStar; - private boolean insertAllowedOnly; + private final ReflectUtil.MethodDispatcher<Result> dispatcher; public MaterializedViewIncrementalRewritingRelVisitor() { - this.containsAggregate = false; - this.hasAllowedOperatorsOnly = true; - this.hasCountStar = false; - this.insertAllowedOnly = false; + this.dispatcher = ReflectUtil.createMethodDispatcher( + Result.class, this, "visit", RelNode.class); + } + + /** + * Starts an iteration. + */ + public Result go(RelNode relNode) { + Result result = dispatcher.invoke(relNode); + if (result.containsAggregate) { + return result; + } + + if (result.incrementalRebuildMode == AVAILABLE) { + // Incremental rebuild of non-aggregate MV is not supported when any source table has delete operations. + return new Result(INSERT_ONLY); + } + + return result; + } + + public Result visit(RelNode relNode) { + // Only TS, Filter, Join, Project and Aggregate are supported + return new Result(NOT_AVAILABLE); + } + + private Result visitChildOf(RelNode rel) { + return visitChildOf(rel, 0); } - @Override - public void visit(RelNode node, int ordinal, RelNode parent) { - if (node instanceof Aggregate) { - this.containsAggregate = true; - check((Aggregate) node); - super.visit(node, ordinal, parent); - } else if ( - node instanceof Filter || - node instanceof Project || - node instanceof Join) { - super.visit(node, ordinal, parent); - } else if (node instanceof TableScan) { - HiveTableScan scan = (HiveTableScan) node; - RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); - if (hiveTable.getHiveTableMD().getStorageHandler() != null && - hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { + private Result visitChildOf(RelNode rel, int index) { + return dispatcher.invoke(rel.getInput(index)); + } + + public Result visit(HiveTableScan scan) { + RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); + + if (hiveTable.getHiveTableMD().getStorageHandler() != null) { + if (hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { // Incremental rebuild of materialized views with non-native source tables are not implemented // when any of the source tables has delete/update operation since the last rebuild - insertAllowedOnly = true; + return new Result(INSERT_ONLY); + } else { + return new Result(NOT_AVAILABLE); } - } else { - hasAllowedOperatorsOnly = false; } + + return new Result(AVAILABLE); + } + + public Result visit(HiveProject project) { + return visitChildOf(project); + } + + public Result visit(HiveFilter filter) { + return visitChildOf(filter); } - private void check(Aggregate aggregate) { + public Result visit(HiveJoin join) { + if (join.getJoinType() != JoinRelType.INNER) { + return new Result(NOT_AVAILABLE); + } + + Result leftResult = visitChildOf(join, 0); + Result rightResult = visitChildOf(join, 1); + + boolean containsAggregate = leftResult.containsAggregate || rightResult.containsAggregate; + int countStarIndex = leftResult.countIndex; + if (countStarIndex == -1 && rightResult.countIndex != -1) { + countStarIndex = rightResult.countIndex + join.getLeft().getRowType().getFieldCount(); + } + switch (rightResult.incrementalRebuildMode) { + case INSERT_ONLY: + return new Result(INSERT_ONLY, containsAggregate, countStarIndex); + case AVAILABLE: + return new Result( + leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : AVAILABLE, + containsAggregate, + countStarIndex); + case NOT_AVAILABLE: + default: + return new Result(rightResult.incrementalRebuildMode, containsAggregate, countStarIndex); + } + } + + public Result visit(HiveAggregate aggregate) { + Result result = visitChildOf(aggregate); + if (result.incrementalRebuildMode != AVAILABLE) { + return new Result(result.incrementalRebuildMode, true, -1); + } + + Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new HashMap<>(aggregate.getRowType().getFieldCount()); + + int countStarIndex = -1; for (int i = 0; i < aggregate.getAggCallList().size(); ++i) { AggregateCall aggregateCall = aggregate.getAggCallList().get(i); - if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && aggregateCall.getArgList().size() == 0) { - hasCountStar = true; - break; + if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && aggregateCall.getArgList().isEmpty()) { + countStarIndex = i; + continue; + } + + for (Integer argIndex : aggregateCall.getArgList()) { + columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new HashSet<>()); + Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex); + aggregates.add(aggregateCall.getAggregation().getKind()); } } - } - /** - * Starts an iteration. - */ - public RelNode go(RelNode p) { - visit(p, 0, null); - return p; - } + IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? INSERT_ONLY : AVAILABLE; + for (int i = 0; i < aggregate.getAggCallList().size(); ++i) { + AggregateCall aggregateCall = aggregate.getAggCallList().get(i); + switch (aggregateCall.getAggregation().getKind()) { + case COUNT: + case SUM: + case SUM0: + break; - public boolean isContainsAggregate() { - return containsAggregate; - } + case AVG: + Set<SqlKind> aggregates = columnRefByAggregateCall.get(aggregateCall.getArgList().get(0)); + if (!(aggregates.contains(SqlKind.SUM) && aggregates.contains(SqlKind.COUNT))) { Review Comment: Should this also be checking that this is not a COUNT(DISTINCT) ? e.g if you have SELECT SUM(b1), COUNT(DISTINCT b1) this cannot be used for AVG. So something like `aggregates.contains(SqlKind.COUNT) && !aggregateCall.isDistinct()` ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> + * <li>Whether the plane has count(*) aggregate function call</li> + * <li>Check whether aggregate functions are supported</li> + * </ul> */ -public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor { +public class MaterializedViewIncrementalRewritingRelVisitor implements ReflectiveVisitor { - private boolean containsAggregate; - private boolean hasAllowedOperatorsOnly; - private boolean hasCountStar; - private boolean insertAllowedOnly; + private final ReflectUtil.MethodDispatcher<Result> dispatcher; public MaterializedViewIncrementalRewritingRelVisitor() { - this.containsAggregate = false; - this.hasAllowedOperatorsOnly = true; - this.hasCountStar = false; - this.insertAllowedOnly = false; + this.dispatcher = ReflectUtil.createMethodDispatcher( + Result.class, this, "visit", RelNode.class); + } + + /** + * Starts an iteration. + */ + public Result go(RelNode relNode) { + Result result = dispatcher.invoke(relNode); + if (result.containsAggregate) { + return result; + } + + if (result.incrementalRebuildMode == AVAILABLE) { + // Incremental rebuild of non-aggregate MV is not supported when any source table has delete operations. + return new Result(INSERT_ONLY); + } + + return result; + } + + public Result visit(RelNode relNode) { + // Only TS, Filter, Join, Project and Aggregate are supported + return new Result(NOT_AVAILABLE); + } + + private Result visitChildOf(RelNode rel) { + return visitChildOf(rel, 0); } - @Override - public void visit(RelNode node, int ordinal, RelNode parent) { - if (node instanceof Aggregate) { - this.containsAggregate = true; - check((Aggregate) node); - super.visit(node, ordinal, parent); - } else if ( - node instanceof Filter || - node instanceof Project || - node instanceof Join) { - super.visit(node, ordinal, parent); - } else if (node instanceof TableScan) { - HiveTableScan scan = (HiveTableScan) node; - RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); - if (hiveTable.getHiveTableMD().getStorageHandler() != null && - hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { + private Result visitChildOf(RelNode rel, int index) { + return dispatcher.invoke(rel.getInput(index)); + } + + public Result visit(HiveTableScan scan) { + RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); + + if (hiveTable.getHiveTableMD().getStorageHandler() != null) { + if (hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { // Incremental rebuild of materialized views with non-native source tables are not implemented // when any of the source tables has delete/update operation since the last rebuild - insertAllowedOnly = true; + return new Result(INSERT_ONLY); + } else { + return new Result(NOT_AVAILABLE); } - } else { - hasAllowedOperatorsOnly = false; } + + return new Result(AVAILABLE); + } + + public Result visit(HiveProject project) { + return visitChildOf(project); + } + + public Result visit(HiveFilter filter) { + return visitChildOf(filter); } - private void check(Aggregate aggregate) { + public Result visit(HiveJoin join) { + if (join.getJoinType() != JoinRelType.INNER) { + return new Result(NOT_AVAILABLE); + } + + Result leftResult = visitChildOf(join, 0); + Result rightResult = visitChildOf(join, 1); + + boolean containsAggregate = leftResult.containsAggregate || rightResult.containsAggregate; + int countStarIndex = leftResult.countIndex; + if (countStarIndex == -1 && rightResult.countIndex != -1) { + countStarIndex = rightResult.countIndex + join.getLeft().getRowType().getFieldCount(); + } + switch (rightResult.incrementalRebuildMode) { + case INSERT_ONLY: + return new Result(INSERT_ONLY, containsAggregate, countStarIndex); Review Comment: What if the leftResult was NOT_AVAILABLE ? In that case, we should return NOT_AVAILABLE even if the rightResult is INSERT_ONLY. Might be easier/more intuitive to express the join case as an If statement instead of switch since you can check both left and right side of the join together. ########## ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/show/ShowMaterializedViewsFormatter.java: ########## @@ -139,7 +138,7 @@ private static String formatIncrementalRebuildMode(Table materializedView) { String incrementalRebuild; HiveRelOptMaterialization relOptMaterialization = HiveMaterializedViewsRegistry.get(). getRewritingMaterializedView(materializedView.getDbName(), materializedView.getTableName(), ALL); - if (relOptMaterialization == null || relOptMaterialization.getRebuildMode() == UNKNOWN) { + if (relOptMaterialization == null) { incrementalRebuild = "Unknown"; Review Comment: Since you have removed the UNKNOWN state in the enum, why not just treat this Not Available instead of 'Unknown'. ########## iceberg/iceberg-handler/src/test/results/positive/show_iceberg_materialized_views.q.out: ########## @@ -229,7 +229,7 @@ shtb_aggr_view2 Yes Manual refresh Available in pres shtb_full_view2 Yes Manual refresh (Valid for 5min) Available in presence of insert operations only shtb_test1_view1 No Manual refresh Unknown shtb_test1_view2 Yes Manual refresh (Valid always) Available in presence of insert operations only -shtb_view_on_native Yes Manual refresh Available +shtb_view_on_native Yes Manual refresh Available in presence of insert operations only Review Comment: nit: This is an existing message, but there's a missing 'the': 'Available in the presence of ..' (a quick grammar check can be done on grammarly.com) Or if you want to shorten it: 'Available for insert operations only' ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> + * <li>Whether the plane has count(*) aggregate function call</li> + * <li>Check whether aggregate functions are supported</li> + * </ul> */ -public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor { +public class MaterializedViewIncrementalRewritingRelVisitor implements ReflectiveVisitor { - private boolean containsAggregate; - private boolean hasAllowedOperatorsOnly; - private boolean hasCountStar; - private boolean insertAllowedOnly; + private final ReflectUtil.MethodDispatcher<Result> dispatcher; public MaterializedViewIncrementalRewritingRelVisitor() { - this.containsAggregate = false; - this.hasAllowedOperatorsOnly = true; - this.hasCountStar = false; - this.insertAllowedOnly = false; + this.dispatcher = ReflectUtil.createMethodDispatcher( + Result.class, this, "visit", RelNode.class); + } + + /** + * Starts an iteration. + */ + public Result go(RelNode relNode) { + Result result = dispatcher.invoke(relNode); + if (result.containsAggregate) { + return result; + } + + if (result.incrementalRebuildMode == AVAILABLE) { + // Incremental rebuild of non-aggregate MV is not supported when any source table has delete operations. + return new Result(INSERT_ONLY); + } + + return result; + } + + public Result visit(RelNode relNode) { + // Only TS, Filter, Join, Project and Aggregate are supported + return new Result(NOT_AVAILABLE); + } + + private Result visitChildOf(RelNode rel) { + return visitChildOf(rel, 0); } - @Override - public void visit(RelNode node, int ordinal, RelNode parent) { - if (node instanceof Aggregate) { - this.containsAggregate = true; - check((Aggregate) node); - super.visit(node, ordinal, parent); - } else if ( - node instanceof Filter || - node instanceof Project || - node instanceof Join) { - super.visit(node, ordinal, parent); - } else if (node instanceof TableScan) { - HiveTableScan scan = (HiveTableScan) node; - RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); - if (hiveTable.getHiveTableMD().getStorageHandler() != null && - hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { + private Result visitChildOf(RelNode rel, int index) { + return dispatcher.invoke(rel.getInput(index)); + } + + public Result visit(HiveTableScan scan) { + RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); + + if (hiveTable.getHiveTableMD().getStorageHandler() != null) { + if (hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { // Incremental rebuild of materialized views with non-native source tables are not implemented // when any of the source tables has delete/update operation since the last rebuild - insertAllowedOnly = true; + return new Result(INSERT_ONLY); + } else { + return new Result(NOT_AVAILABLE); } - } else { - hasAllowedOperatorsOnly = false; } + + return new Result(AVAILABLE); + } + + public Result visit(HiveProject project) { + return visitChildOf(project); + } + + public Result visit(HiveFilter filter) { + return visitChildOf(filter); } - private void check(Aggregate aggregate) { + public Result visit(HiveJoin join) { + if (join.getJoinType() != JoinRelType.INNER) { + return new Result(NOT_AVAILABLE); + } + + Result leftResult = visitChildOf(join, 0); + Result rightResult = visitChildOf(join, 1); + + boolean containsAggregate = leftResult.containsAggregate || rightResult.containsAggregate; + int countStarIndex = leftResult.countIndex; + if (countStarIndex == -1 && rightResult.countIndex != -1) { + countStarIndex = rightResult.countIndex + join.getLeft().getRowType().getFieldCount(); + } + switch (rightResult.incrementalRebuildMode) { + case INSERT_ONLY: + return new Result(INSERT_ONLY, containsAggregate, countStarIndex); + case AVAILABLE: + return new Result( + leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : AVAILABLE, + containsAggregate, + countStarIndex); + case NOT_AVAILABLE: + default: + return new Result(rightResult.incrementalRebuildMode, containsAggregate, countStarIndex); + } + } + + public Result visit(HiveAggregate aggregate) { + Result result = visitChildOf(aggregate); + if (result.incrementalRebuildMode != AVAILABLE) { + return new Result(result.incrementalRebuildMode, true, -1); + } + + Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new HashMap<>(aggregate.getRowType().getFieldCount()); + + int countStarIndex = -1; for (int i = 0; i < aggregate.getAggCallList().size(); ++i) { AggregateCall aggregateCall = aggregate.getAggCallList().get(i); - if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && aggregateCall.getArgList().size() == 0) { - hasCountStar = true; - break; + if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && aggregateCall.getArgList().isEmpty()) { + countStarIndex = i; + continue; + } + + for (Integer argIndex : aggregateCall.getArgList()) { + columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new HashSet<>()); + Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex); + aggregates.add(aggregateCall.getAggregation().getKind()); } } - } - /** - * Starts an iteration. - */ - public RelNode go(RelNode p) { - visit(p, 0, null); - return p; - } + IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? INSERT_ONLY : AVAILABLE; + for (int i = 0; i < aggregate.getAggCallList().size(); ++i) { + AggregateCall aggregateCall = aggregate.getAggCallList().get(i); + switch (aggregateCall.getAggregation().getKind()) { + case COUNT: + case SUM: + case SUM0: + break; - public boolean isContainsAggregate() { - return containsAggregate; - } + case AVG: + Set<SqlKind> aggregates = columnRefByAggregateCall.get(aggregateCall.getArgList().get(0)); + if (!(aggregates.contains(SqlKind.SUM) && aggregates.contains(SqlKind.COUNT))) { + incrementalRebuildMode = NOT_AVAILABLE; + } + break; - public boolean hasAllowedOperatorsOnly() { - return hasAllowedOperatorsOnly; - } + case MIN: + case MAX: + incrementalRebuildMode = INSERT_ONLY; Review Comment: If we have a mix of the aggregate functions, some of which are incrementally maintainable (e.g MIN/MAX) and some which are not (e.g VARIANCE), it looks like `incrementalRebuildMode` will get overwritten by the last agg that is processed in the for loop. If any of the aggregate functions qualifies for NOT_AVAILABLE, then we should be treating the MV as not eligible for incremental refresh. ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -17,97 +17,210 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY; +import static org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE; + /** * This class is a helper to check whether a materialized view rebuild * can be transformed from INSERT OVERWRITE to INSERT INTO. * * We are verifying that: - * 1) Plan only uses legal operators (i.e., Filter, Project, - * Join, and TableScan) - * 2) Whether the plane has aggregate - * 3) Whether the plane has an count(*) aggregate function call + * <ul> + * <li>Plan only uses legal operators (i.e., Filter, Project, Join, and TableScan)</li> + * <li>Whether the plane has aggregate</li> + * <li>Whether the plane has count(*) aggregate function call</li> + * <li>Check whether aggregate functions are supported</li> + * </ul> */ -public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor { +public class MaterializedViewIncrementalRewritingRelVisitor implements ReflectiveVisitor { - private boolean containsAggregate; - private boolean hasAllowedOperatorsOnly; - private boolean hasCountStar; - private boolean insertAllowedOnly; + private final ReflectUtil.MethodDispatcher<Result> dispatcher; public MaterializedViewIncrementalRewritingRelVisitor() { - this.containsAggregate = false; - this.hasAllowedOperatorsOnly = true; - this.hasCountStar = false; - this.insertAllowedOnly = false; + this.dispatcher = ReflectUtil.createMethodDispatcher( + Result.class, this, "visit", RelNode.class); + } + + /** + * Starts an iteration. + */ + public Result go(RelNode relNode) { + Result result = dispatcher.invoke(relNode); + if (result.containsAggregate) { + return result; + } + + if (result.incrementalRebuildMode == AVAILABLE) { + // Incremental rebuild of non-aggregate MV is not supported when any source table has delete operations. + return new Result(INSERT_ONLY); + } + + return result; + } + + public Result visit(RelNode relNode) { + // Only TS, Filter, Join, Project and Aggregate are supported + return new Result(NOT_AVAILABLE); + } + + private Result visitChildOf(RelNode rel) { + return visitChildOf(rel, 0); } - @Override - public void visit(RelNode node, int ordinal, RelNode parent) { - if (node instanceof Aggregate) { - this.containsAggregate = true; - check((Aggregate) node); - super.visit(node, ordinal, parent); - } else if ( - node instanceof Filter || - node instanceof Project || - node instanceof Join) { - super.visit(node, ordinal, parent); - } else if (node instanceof TableScan) { - HiveTableScan scan = (HiveTableScan) node; - RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); - if (hiveTable.getHiveTableMD().getStorageHandler() != null && - hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { + private Result visitChildOf(RelNode rel, int index) { + return dispatcher.invoke(rel.getInput(index)); + } + + public Result visit(HiveTableScan scan) { + RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable(); + + if (hiveTable.getHiveTableMD().getStorageHandler() != null) { + if (hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) { // Incremental rebuild of materialized views with non-native source tables are not implemented // when any of the source tables has delete/update operation since the last rebuild - insertAllowedOnly = true; + return new Result(INSERT_ONLY); + } else { + return new Result(NOT_AVAILABLE); } - } else { - hasAllowedOperatorsOnly = false; } + + return new Result(AVAILABLE); + } + + public Result visit(HiveProject project) { + return visitChildOf(project); + } + + public Result visit(HiveFilter filter) { + return visitChildOf(filter); } - private void check(Aggregate aggregate) { + public Result visit(HiveJoin join) { + if (join.getJoinType() != JoinRelType.INNER) { + return new Result(NOT_AVAILABLE); + } + + Result leftResult = visitChildOf(join, 0); + Result rightResult = visitChildOf(join, 1); + + boolean containsAggregate = leftResult.containsAggregate || rightResult.containsAggregate; + int countStarIndex = leftResult.countIndex; + if (countStarIndex == -1 && rightResult.countIndex != -1) { + countStarIndex = rightResult.countIndex + join.getLeft().getRowType().getFieldCount(); + } + switch (rightResult.incrementalRebuildMode) { + case INSERT_ONLY: + return new Result(INSERT_ONLY, containsAggregate, countStarIndex); + case AVAILABLE: + return new Result( + leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : AVAILABLE, + containsAggregate, + countStarIndex); + case NOT_AVAILABLE: + default: + return new Result(rightResult.incrementalRebuildMode, containsAggregate, countStarIndex); + } + } + + public Result visit(HiveAggregate aggregate) { + Result result = visitChildOf(aggregate); + if (result.incrementalRebuildMode != AVAILABLE) { + return new Result(result.incrementalRebuildMode, true, -1); + } + + Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new HashMap<>(aggregate.getRowType().getFieldCount()); + + int countStarIndex = -1; for (int i = 0; i < aggregate.getAggCallList().size(); ++i) { AggregateCall aggregateCall = aggregate.getAggCallList().get(i); - if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && aggregateCall.getArgList().size() == 0) { - hasCountStar = true; - break; + if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && aggregateCall.getArgList().isEmpty()) { + countStarIndex = i; + continue; + } + + for (Integer argIndex : aggregateCall.getArgList()) { + columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new HashSet<>()); + Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex); + aggregates.add(aggregateCall.getAggregation().getKind()); } } - } - /** - * Starts an iteration. - */ - public RelNode go(RelNode p) { - visit(p, 0, null); - return p; - } + IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? INSERT_ONLY : AVAILABLE; + for (int i = 0; i < aggregate.getAggCallList().size(); ++i) { + AggregateCall aggregateCall = aggregate.getAggCallList().get(i); + switch (aggregateCall.getAggregation().getKind()) { + case COUNT: + case SUM: + case SUM0: + break; - public boolean isContainsAggregate() { - return containsAggregate; - } + case AVG: + Set<SqlKind> aggregates = columnRefByAggregateCall.get(aggregateCall.getArgList().get(0)); + if (!(aggregates.contains(SqlKind.SUM) && aggregates.contains(SqlKind.COUNT))) { + incrementalRebuildMode = NOT_AVAILABLE; + } + break; - public boolean hasAllowedOperatorsOnly() { - return hasAllowedOperatorsOnly; - } + case MIN: + case MAX: + incrementalRebuildMode = INSERT_ONLY; + break; + + default: + incrementalRebuildMode = NOT_AVAILABLE; + break; + } + } - public boolean isInsertAllowedOnly() { - return insertAllowedOnly; + return new Result(incrementalRebuildMode, true, countStarIndex); } - public boolean hasCountStar() { - return hasCountStar; + public static final class Result { + private final IncrementalRebuildMode incrementalRebuildMode; + private final boolean containsAggregate; + private final int countIndex; Review Comment: nit: countStarIndex such that we don't confuse it with a non-count-star. ########## ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/TestMaterializedViewIncrementalRewritingRelVisitor.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.doReturn; + +@RunWith(MockitoJUnitRunner.class) +public class TestMaterializedViewIncrementalRewritingRelVisitor extends TestRuleBase { + + @Test + public void testIncrementalRebuildIsNotAvailableWhenPlanHasUnsupportedOperator() { + RelNode ts1 = createTS(t1NativeMock, "t1"); + + RelNode mvQueryPlan = REL_BUILDER + .push(ts1) + .sort(1) // Order by is not supported + .build(); + + MaterializedViewIncrementalRewritingRelVisitor visitor = new MaterializedViewIncrementalRewritingRelVisitor(); + assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), is(IncrementalRebuildMode.NOT_AVAILABLE)); + } + + @Test + public void testIncrementalRebuildIsInsertOnlyWhenPlanHasTSOnNonNativeTable() { + RelNode ts1 = createT2IcebergTS(); + + RelNode mvQueryPlan = REL_BUILDER + .push(ts1) + .build(); + + MaterializedViewIncrementalRewritingRelVisitor visitor = new MaterializedViewIncrementalRewritingRelVisitor(); + assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), is(IncrementalRebuildMode.NOT_AVAILABLE)); + } + + @Test + public void testIncrementalRebuildIsInsertOnlyWhenPlanHasTSOnNonNativeTableSupportsSnapshots() { + doReturn(true).when(table2storageHandler).areSnapshotsSupported(); + RelNode ts1 = createT2IcebergTS(); + + RelNode mvQueryPlan = REL_BUILDER + .push(ts1) + .build(); + + MaterializedViewIncrementalRewritingRelVisitor visitor = new MaterializedViewIncrementalRewritingRelVisitor(); + assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), is(IncrementalRebuildMode.INSERT_ONLY)); + } + + @Test + public void testIncrementalRebuildIsInsertOnlyWhenPlanHasFilter() { + RelNode ts1 = createTS(t1NativeMock, "t1"); + + RelNode mvQueryPlan = REL_BUILDER + .push(ts1) + .filter(REX_BUILDER.makeCall(SqlStdOperatorTable.IS_NOT_NULL, REX_BUILDER.makeInputRef(ts1, 0))) + .build(); + + MaterializedViewIncrementalRewritingRelVisitor visitor = new MaterializedViewIncrementalRewritingRelVisitor(); + MaterializedViewIncrementalRewritingRelVisitor.Result result = visitor.go(mvQueryPlan); + + assertThat(result.getIncrementalRebuildMode(), is(IncrementalRebuildMode.INSERT_ONLY)); + assertThat(result.containsAggregate(), is(false)); + } + + @Test + public void testIncrementalRebuildIsInsertOnlyWhenPlanHasInnerJoin() { + RelNode ts1 = createTS(t1NativeMock, "t1"); + RelNode ts2 = createTS(t2NativeMock, "t2"); + + RexNode joinCondition = REX_BUILDER.makeCall(SqlStdOperatorTable.EQUALS, + REX_BUILDER.makeInputRef(ts1.getRowType().getFieldList().get(0).getType(), 0), + REX_BUILDER.makeInputRef(ts2.getRowType().getFieldList().get(0).getType(), 5)); + + RelNode mvQueryPlan = REL_BUILDER + .push(ts1) + .push(ts2) + .join(JoinRelType.INNER, joinCondition) + .build(); + + MaterializedViewIncrementalRewritingRelVisitor visitor = new MaterializedViewIncrementalRewritingRelVisitor(); + assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), is(IncrementalRebuildMode.INSERT_ONLY)); + } + + @Test + public void testIncrementalRebuildIsNotAvailableWhenPlanHasJoinOtherThanInner() { + RelNode ts1 = createTS(t1NativeMock, "t1"); + RelNode ts2 = createTS(t2NativeMock, "t2"); + + RexNode joinCondition = REX_BUILDER.makeCall(SqlStdOperatorTable.EQUALS, + REX_BUILDER.makeInputRef(ts1.getRowType().getFieldList().get(0).getType(), 0), + REX_BUILDER.makeInputRef(ts2.getRowType().getFieldList().get(0).getType(), 5)); + + RelNode mvQueryPlan = REL_BUILDER + .push(ts1) + .push(ts2) + .join(JoinRelType.LEFT, joinCondition) + .build(); + + MaterializedViewIncrementalRewritingRelVisitor visitor = new MaterializedViewIncrementalRewritingRelVisitor(); + assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), is(IncrementalRebuildMode.NOT_AVAILABLE)); + } + + @Test + public void testIncrementalRebuildIsInsertOnlyWhenPlanHasAggregate() { Review Comment: I notice that all the tests with aggregate have a grouping key. Can we also add a couple of tests with plain (non-group) aggregates ? Those should also be incrementally maintainable as long as they satisfy the other criteria. For a plain aggregate, I believe we will still need a COUNT(*) to check if DELETE on the source table deletes all the rows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org