amansinha100 commented on code in PR #5096:
URL: https://github.com/apache/hive/pull/5096#discussion_r1510026927


##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java:
##########
@@ -353,29 +353,32 @@ private RelNode applyRecordIncrementalRebuildPlan(
       MaterializedViewRewritingRelVisitor visitor =
           new MaterializedViewRewritingRelVisitor(acidView);
       visitor.go(basePlan);
-      if (visitor.isRewritingAllowed()) {
-        if (!materialization.isSourceTablesUpdateDeleteModified()) {
-          // Trigger rewriting to remove UNION branch with MV
+      switch (visitor.getIncrementalRebuildMode()) {
+        case INSERT_ONLY:
+          if (materialization.isSourceTablesUpdateDeleteModified()) {
+            return calcitePreMVRewritingPlan;
+          }
+
           if (visitor.isContainsAggregate()) {
             return applyAggregateInsertIncremental(basePlan, mdProvider, 
executorProvider, optCluster, calcitePreMVRewritingPlan);
           } else {
             return applyJoinInsertIncremental(basePlan, mdProvider, 
executorProvider);
           }
-        } else {
-          // count(*) is necessary for determine which rows should be deleted 
from the view
-          // if view definition does not have it incremental rebuild can not 
be performed
-          if (acidView && visitor.isContainsAggregate() && 
visitor.getCountIndex() >= 0) {
-            return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, 
executorProvider);
+        case AVAILABLE:
+          if (!materialization.isSourceTablesUpdateDeleteModified()) {
+            return applyAggregateInsertIncremental(basePlan, mdProvider, 
executorProvider, optCluster, calcitePreMVRewritingPlan);
           } else {
+            return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, 
executorProvider);
+          }
+        case NOT_AVAILABLE:
+        default:
+          if (materialization.isSourceTablesUpdateDeleteModified()) {
+            // calcitePreMVRewritingPlan is already got the optimizations by 
applyPreJoinOrderingTransforms prior calling

Review Comment:
   nit: "prior to calling"



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>

Review Comment:
   nit: 'plan'



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewRewritingRelVisitor.java:
##########
@@ -45,18 +39,15 @@
  */
 public class MaterializedViewRewritingRelVisitor extends RelVisitor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(MaterializedViewRewritingRelVisitor.class);

Review Comment:
   It would be super useful to have DEBUG level logging for the cases where the 
incremental rebuild was not feasible.  The log message could indicate the exact 
reason why this was not applied, since some of the reasons are not obvious. 



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>

Review Comment:
   nit: 'plan' instead of 'plane'



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    if (hiveTable.getHiveTableMD().getStorageHandler() != null) {
+      if 
(hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        return new Result(INSERT_ONLY);
+      } else {
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    int countStarIndex = leftResult.countIndex;
+    if (countStarIndex == -1 && rightResult.countIndex != -1) {
+      countStarIndex = rightResult.countIndex + 
join.getLeft().getRowType().getFieldCount();
+    }
+    switch (rightResult.incrementalRebuildMode) {
+      case INSERT_ONLY:
+        return new Result(INSERT_ONLY, containsAggregate, countStarIndex);
+      case AVAILABLE:
+        return new Result(
+            leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : 
AVAILABLE,
+            containsAggregate,
+            countStarIndex);
+      case NOT_AVAILABLE:
+      default:

Review Comment:
   Since there should only be 3 values possible, can we assert with an error 
message for the default case so we catch any hidden bugs ? We should not fall 
through to the default. 



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    if (hiveTable.getHiveTableMD().getStorageHandler() != null) {
+      if 
(hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        return new Result(INSERT_ONLY);
+      } else {
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    int countStarIndex = leftResult.countIndex;
+    if (countStarIndex == -1 && rightResult.countIndex != -1) {
+      countStarIndex = rightResult.countIndex + 
join.getLeft().getRowType().getFieldCount();
+    }
+    switch (rightResult.incrementalRebuildMode) {
+      case INSERT_ONLY:
+        return new Result(INSERT_ONLY, containsAggregate, countStarIndex);
+      case AVAILABLE:
+        return new Result(
+            leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : 
AVAILABLE,
+            containsAggregate,
+            countStarIndex);
+      case NOT_AVAILABLE:
+      default:
+        return new Result(rightResult.incrementalRebuildMode, 
containsAggregate, countStarIndex);
+    }
+  }
+
+  public Result visit(HiveAggregate aggregate) {
+    Result result = visitChildOf(aggregate);
+    if (result.incrementalRebuildMode != AVAILABLE) {
+      return new Result(result.incrementalRebuildMode, true, -1);
+    }
+
+    Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new 
HashMap<>(aggregate.getRowType().getFieldCount());
+
+    int countStarIndex = -1;
     for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
       AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
-      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().size() == 0) {
-        hasCountStar = true;
-        break;
+      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().isEmpty()) {
+        countStarIndex = i;
+        continue;
+      }
+
+      for (Integer argIndex : aggregateCall.getArgList()) {
+        columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new 
HashSet<>());
+        Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex);
+        aggregates.add(aggregateCall.getAggregation().getKind());
       }
     }
-  }
 
-  /**
-   * Starts an iteration.
-   */
-  public RelNode go(RelNode p) {
-    visit(p, 0, null);
-    return p;
-  }
+    IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? 
INSERT_ONLY : AVAILABLE;
+    for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
+      AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
+      switch (aggregateCall.getAggregation().getKind()) {
+        case COUNT:
+        case SUM:
+        case SUM0:
+          break;
 
-  public boolean isContainsAggregate() {
-    return containsAggregate;
-  }
+        case AVG:
+          Set<SqlKind> aggregates = 
columnRefByAggregateCall.get(aggregateCall.getArgList().get(0));
+          if (!(aggregates.contains(SqlKind.SUM) && 
aggregates.contains(SqlKind.COUNT))) {

Review Comment:
   Should this also be checking that this is not a COUNT(DISTINCT) ?
   e.g if you have 
     SELECT SUM(b1), COUNT(DISTINCT b1) 
   this cannot be used for AVG. 
   So something like 
     `aggregates.contains(SqlKind.COUNT) && !aggregateCall.isDistinct()`



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    if (hiveTable.getHiveTableMD().getStorageHandler() != null) {
+      if 
(hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        return new Result(INSERT_ONLY);
+      } else {
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    int countStarIndex = leftResult.countIndex;
+    if (countStarIndex == -1 && rightResult.countIndex != -1) {
+      countStarIndex = rightResult.countIndex + 
join.getLeft().getRowType().getFieldCount();
+    }
+    switch (rightResult.incrementalRebuildMode) {
+      case INSERT_ONLY:
+        return new Result(INSERT_ONLY, containsAggregate, countStarIndex);

Review Comment:
   What if the leftResult was NOT_AVAILABLE ?   In that case, we should return 
NOT_AVAILABLE even if the rightResult is INSERT_ONLY.   Might be easier/more 
intuitive to express the join case as an If statement instead of switch since 
you can check both left and right side of the join together. 



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/show/ShowMaterializedViewsFormatter.java:
##########
@@ -139,7 +138,7 @@ private static String formatIncrementalRebuildMode(Table 
materializedView) {
     String incrementalRebuild;
     HiveRelOptMaterialization relOptMaterialization = 
HiveMaterializedViewsRegistry.get().
             getRewritingMaterializedView(materializedView.getDbName(), 
materializedView.getTableName(), ALL);
-    if (relOptMaterialization == null || 
relOptMaterialization.getRebuildMode() == UNKNOWN) {
+    if (relOptMaterialization == null) {
       incrementalRebuild = "Unknown";

Review Comment:
   Since you have removed the UNKNOWN state in the enum, why not just treat 
this Not Available instead of 'Unknown'. 



##########
iceberg/iceberg-handler/src/test/results/positive/show_iceberg_materialized_views.q.out:
##########
@@ -229,7 +229,7 @@ shtb_aggr_view2             Yes                     Manual 
refresh          Available in pres
 shtb_full_view2        Yes                     Manual refresh (Valid for 5min) 
Available in presence of insert operations only
 shtb_test1_view1       No                      Manual refresh          Unknown 
            
 shtb_test1_view2       Yes                     Manual refresh (Valid always)   
Available in presence of insert operations only
-shtb_view_on_native    Yes                     Manual refresh          
Available           
+shtb_view_on_native    Yes                     Manual refresh          
Available in presence of insert operations only

Review Comment:
   nit: This is an existing message, but there's a missing 'the':  'Available 
in the presence of ..'   (a quick grammar check can be done on grammarly.com)
   Or if you want to shorten it: 'Available for insert operations only'



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    if (hiveTable.getHiveTableMD().getStorageHandler() != null) {
+      if 
(hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        return new Result(INSERT_ONLY);
+      } else {
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    int countStarIndex = leftResult.countIndex;
+    if (countStarIndex == -1 && rightResult.countIndex != -1) {
+      countStarIndex = rightResult.countIndex + 
join.getLeft().getRowType().getFieldCount();
+    }
+    switch (rightResult.incrementalRebuildMode) {
+      case INSERT_ONLY:
+        return new Result(INSERT_ONLY, containsAggregate, countStarIndex);
+      case AVAILABLE:
+        return new Result(
+            leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : 
AVAILABLE,
+            containsAggregate,
+            countStarIndex);
+      case NOT_AVAILABLE:
+      default:
+        return new Result(rightResult.incrementalRebuildMode, 
containsAggregate, countStarIndex);
+    }
+  }
+
+  public Result visit(HiveAggregate aggregate) {
+    Result result = visitChildOf(aggregate);
+    if (result.incrementalRebuildMode != AVAILABLE) {
+      return new Result(result.incrementalRebuildMode, true, -1);
+    }
+
+    Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new 
HashMap<>(aggregate.getRowType().getFieldCount());
+
+    int countStarIndex = -1;
     for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
       AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
-      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().size() == 0) {
-        hasCountStar = true;
-        break;
+      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().isEmpty()) {
+        countStarIndex = i;
+        continue;
+      }
+
+      for (Integer argIndex : aggregateCall.getArgList()) {
+        columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new 
HashSet<>());
+        Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex);
+        aggregates.add(aggregateCall.getAggregation().getKind());
       }
     }
-  }
 
-  /**
-   * Starts an iteration.
-   */
-  public RelNode go(RelNode p) {
-    visit(p, 0, null);
-    return p;
-  }
+    IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? 
INSERT_ONLY : AVAILABLE;
+    for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
+      AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
+      switch (aggregateCall.getAggregation().getKind()) {
+        case COUNT:
+        case SUM:
+        case SUM0:
+          break;
 
-  public boolean isContainsAggregate() {
-    return containsAggregate;
-  }
+        case AVG:
+          Set<SqlKind> aggregates = 
columnRefByAggregateCall.get(aggregateCall.getArgList().get(0));
+          if (!(aggregates.contains(SqlKind.SUM) && 
aggregates.contains(SqlKind.COUNT))) {
+            incrementalRebuildMode = NOT_AVAILABLE;
+          }
+          break;
 
-  public boolean hasAllowedOperatorsOnly() {
-    return hasAllowedOperatorsOnly;
-  }
+        case MIN:
+        case MAX:
+          incrementalRebuildMode = INSERT_ONLY;

Review Comment:
   If we have a mix of the aggregate functions, some of which are incrementally 
maintainable (e.g MIN/MAX) and some which are not (e.g VARIANCE), it looks like 
 `incrementalRebuildMode` will get overwritten by the last agg that is 
processed in the for loop.  If any of the aggregate functions qualifies for 
NOT_AVAILABLE, then we should be treating the MV as not eligible for 
incremental refresh.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    if (hiveTable.getHiveTableMD().getStorageHandler() != null) {
+      if 
(hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        return new Result(INSERT_ONLY);
+      } else {
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    int countStarIndex = leftResult.countIndex;
+    if (countStarIndex == -1 && rightResult.countIndex != -1) {
+      countStarIndex = rightResult.countIndex + 
join.getLeft().getRowType().getFieldCount();
+    }
+    switch (rightResult.incrementalRebuildMode) {
+      case INSERT_ONLY:
+        return new Result(INSERT_ONLY, containsAggregate, countStarIndex);
+      case AVAILABLE:
+        return new Result(
+            leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : 
AVAILABLE,
+            containsAggregate,
+            countStarIndex);
+      case NOT_AVAILABLE:
+      default:
+        return new Result(rightResult.incrementalRebuildMode, 
containsAggregate, countStarIndex);
+    }
+  }
+
+  public Result visit(HiveAggregate aggregate) {
+    Result result = visitChildOf(aggregate);
+    if (result.incrementalRebuildMode != AVAILABLE) {
+      return new Result(result.incrementalRebuildMode, true, -1);
+    }
+
+    Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new 
HashMap<>(aggregate.getRowType().getFieldCount());
+
+    int countStarIndex = -1;
     for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
       AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
-      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().size() == 0) {
-        hasCountStar = true;
-        break;
+      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().isEmpty()) {
+        countStarIndex = i;
+        continue;
+      }
+
+      for (Integer argIndex : aggregateCall.getArgList()) {
+        columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new 
HashSet<>());
+        Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex);
+        aggregates.add(aggregateCall.getAggregation().getKind());
       }
     }
-  }
 
-  /**
-   * Starts an iteration.
-   */
-  public RelNode go(RelNode p) {
-    visit(p, 0, null);
-    return p;
-  }
+    IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? 
INSERT_ONLY : AVAILABLE;
+    for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
+      AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
+      switch (aggregateCall.getAggregation().getKind()) {
+        case COUNT:
+        case SUM:
+        case SUM0:
+          break;
 
-  public boolean isContainsAggregate() {
-    return containsAggregate;
-  }
+        case AVG:
+          Set<SqlKind> aggregates = 
columnRefByAggregateCall.get(aggregateCall.getArgList().get(0));
+          if (!(aggregates.contains(SqlKind.SUM) && 
aggregates.contains(SqlKind.COUNT))) {
+            incrementalRebuildMode = NOT_AVAILABLE;
+          }
+          break;
 
-  public boolean hasAllowedOperatorsOnly() {
-    return hasAllowedOperatorsOnly;
-  }
+        case MIN:
+        case MAX:
+          incrementalRebuildMode = INSERT_ONLY;
+          break;
+
+        default:
+          incrementalRebuildMode = NOT_AVAILABLE;
+          break;
+      }
+    }
 
-  public boolean isInsertAllowedOnly() {
-    return insertAllowedOnly;
+    return new Result(incrementalRebuildMode, true, countStarIndex);
   }
 
-  public boolean hasCountStar() {
-    return hasCountStar;
+  public static final class Result {
+    private final IncrementalRebuildMode incrementalRebuildMode;
+    private final boolean containsAggregate;
+    private final int countIndex;

Review Comment:
   nit: countStarIndex such that we don't confuse it with a non-count-star.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/TestMaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMaterializedViewIncrementalRewritingRelVisitor extends 
TestRuleBase {
+
+  @Test
+  public void 
testIncrementalRebuildIsNotAvailableWhenPlanHasUnsupportedOperator() {
+    RelNode ts1 = createTS(t1NativeMock, "t1");
+
+    RelNode mvQueryPlan = REL_BUILDER
+        .push(ts1)
+        .sort(1) // Order by is not supported
+        .build();
+
+    MaterializedViewIncrementalRewritingRelVisitor visitor = new 
MaterializedViewIncrementalRewritingRelVisitor();
+    assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), 
is(IncrementalRebuildMode.NOT_AVAILABLE));
+  }
+
+  @Test
+  public void 
testIncrementalRebuildIsInsertOnlyWhenPlanHasTSOnNonNativeTable() {
+    RelNode ts1 = createT2IcebergTS();
+
+    RelNode mvQueryPlan = REL_BUILDER
+        .push(ts1)
+        .build();
+
+    MaterializedViewIncrementalRewritingRelVisitor visitor = new 
MaterializedViewIncrementalRewritingRelVisitor();
+    assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), 
is(IncrementalRebuildMode.NOT_AVAILABLE));
+  }
+
+  @Test
+  public void 
testIncrementalRebuildIsInsertOnlyWhenPlanHasTSOnNonNativeTableSupportsSnapshots()
 {
+    doReturn(true).when(table2storageHandler).areSnapshotsSupported();
+    RelNode ts1 = createT2IcebergTS();
+
+    RelNode mvQueryPlan = REL_BUILDER
+        .push(ts1)
+        .build();
+
+    MaterializedViewIncrementalRewritingRelVisitor visitor = new 
MaterializedViewIncrementalRewritingRelVisitor();
+    assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), 
is(IncrementalRebuildMode.INSERT_ONLY));
+  }
+
+  @Test
+  public void testIncrementalRebuildIsInsertOnlyWhenPlanHasFilter() {
+    RelNode ts1 = createTS(t1NativeMock, "t1");
+
+    RelNode mvQueryPlan = REL_BUILDER
+        .push(ts1)
+        .filter(REX_BUILDER.makeCall(SqlStdOperatorTable.IS_NOT_NULL, 
REX_BUILDER.makeInputRef(ts1, 0)))
+        .build();
+
+    MaterializedViewIncrementalRewritingRelVisitor visitor = new 
MaterializedViewIncrementalRewritingRelVisitor();
+    MaterializedViewIncrementalRewritingRelVisitor.Result result = 
visitor.go(mvQueryPlan);
+
+    assertThat(result.getIncrementalRebuildMode(), 
is(IncrementalRebuildMode.INSERT_ONLY));
+    assertThat(result.containsAggregate(), is(false));
+  }
+
+  @Test
+  public void testIncrementalRebuildIsInsertOnlyWhenPlanHasInnerJoin() {
+    RelNode ts1 = createTS(t1NativeMock, "t1");
+    RelNode ts2 = createTS(t2NativeMock, "t2");
+
+    RexNode joinCondition = REX_BUILDER.makeCall(SqlStdOperatorTable.EQUALS,
+        
REX_BUILDER.makeInputRef(ts1.getRowType().getFieldList().get(0).getType(), 0),
+        
REX_BUILDER.makeInputRef(ts2.getRowType().getFieldList().get(0).getType(), 5));
+
+    RelNode mvQueryPlan = REL_BUILDER
+        .push(ts1)
+        .push(ts2)
+        .join(JoinRelType.INNER, joinCondition)
+        .build();
+
+    MaterializedViewIncrementalRewritingRelVisitor visitor = new 
MaterializedViewIncrementalRewritingRelVisitor();
+    assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), 
is(IncrementalRebuildMode.INSERT_ONLY));
+  }
+
+  @Test
+  public void 
testIncrementalRebuildIsNotAvailableWhenPlanHasJoinOtherThanInner() {
+    RelNode ts1 = createTS(t1NativeMock, "t1");
+    RelNode ts2 = createTS(t2NativeMock, "t2");
+
+    RexNode joinCondition = REX_BUILDER.makeCall(SqlStdOperatorTable.EQUALS,
+        
REX_BUILDER.makeInputRef(ts1.getRowType().getFieldList().get(0).getType(), 0),
+        
REX_BUILDER.makeInputRef(ts2.getRowType().getFieldList().get(0).getType(), 5));
+
+    RelNode mvQueryPlan = REL_BUILDER
+        .push(ts1)
+        .push(ts2)
+        .join(JoinRelType.LEFT, joinCondition)
+        .build();
+
+    MaterializedViewIncrementalRewritingRelVisitor visitor = new 
MaterializedViewIncrementalRewritingRelVisitor();
+    assertThat(visitor.go(mvQueryPlan).getIncrementalRebuildMode(), 
is(IncrementalRebuildMode.NOT_AVAILABLE));
+  }
+
+  @Test
+  public void testIncrementalRebuildIsInsertOnlyWhenPlanHasAggregate() {

Review Comment:
   I notice that all the tests with aggregate have a grouping key.  Can we also 
add a couple of tests with plain (non-group) aggregates ?  Those should also be 
incrementally maintainable as long as they satisfy the other criteria. For a 
plain aggregate,  I believe we will still need a COUNT(*) to check if DELETE on 
the source table deletes all the rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to