amansinha100 commented on code in PR #5096:
URL: https://github.com/apache/hive/pull/5096#discussion_r1515630621


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,255 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
 
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plan has aggregate</li>
+ *   <li>Whether the plan has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MaterializedViewIncrementalRewritingRelVisitor.class);
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    LOG.debug("Plan has unsupported operator {}", relNode);
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
+  }
+
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    Table hiveTableMD = hiveTable.getHiveTableMD();
+    if (hiveTableMD.getStorageHandler() != null) {
+      if (hiveTableMD.getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        LOG.debug("Table scan of non-native table {} with {} storage handler 
supports insert only materialized view" +
+            " incremental rebuild.",
+            hiveTableMD.getTableName(), 
hiveTableMD.getStorageHandler().getClass().getSimpleName());
+        return new Result(INSERT_ONLY);
+      } else {
+        LOG.debug("Unsupported table type: non-native table {} with storage 
handler {}",
+            hiveTableMD.getTableName(), 
hiveTableMD.getStorageHandler().getClass().getSimpleName());
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
+  }
+
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      LOG.debug("Unsupported join type {}", join.getJoinType());
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    if (leftResult.incrementalRebuildMode == NOT_AVAILABLE || 
rightResult.incrementalRebuildMode == NOT_AVAILABLE) {
+      return new Result(NOT_AVAILABLE, containsAggregate);
+    }
+    if (leftResult.incrementalRebuildMode == INSERT_ONLY || 
rightResult.incrementalRebuildMode == INSERT_ONLY) {
+      return new Result(INSERT_ONLY, containsAggregate);
+    }
+    return new Result(AVAILABLE, containsAggregate);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveAggregate aggregate) {
+    Result result = visitChildOf(aggregate);
+    if (result.incrementalRebuildMode == NOT_AVAILABLE) {
+      return new Result(result.incrementalRebuildMode, true, -1);
+    }
+
+    Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new 
HashMap<>(aggregate.getRowType().getFieldCount());
+
+    int countStarIndex = -1;
     for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
       AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
-      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().size() == 0) {
-        hasCountStar = true;
-        break;
+      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT &&
+          aggregateCall.getArgList().isEmpty() &&
+          !aggregateCall.isDistinct() &&
+          !aggregateCall.isApproximate()) {
+        countStarIndex = i;
+        continue;
+      }
+
+      for (Integer argIndex : aggregateCall.getArgList()) {
+        columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new 
HashSet<>());
+        Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex);
+        aggregates.add(aggregateCall.getAggregation().getKind());
       }
     }
-  }
 
-  /**
-   * Starts an iteration.
-   */
-  public RelNode go(RelNode p) {
-    visit(p, 0, null);
-    return p;
-  }
+    IncrementalRebuildMode incrementalRebuildMode =
+        result.incrementalRebuildMode == INSERT_ONLY || countStarIndex == -1 ? 
INSERT_ONLY : AVAILABLE;
+    LOG.debug("Initial incremental rebuild mode {} input's incremental rebuild 
mode {} count star index {}",
+        incrementalRebuildMode, result.incrementalRebuildMode, countStarIndex);
 
-  public boolean isContainsAggregate() {
-    return containsAggregate;
-  }
+    incrementalRebuildMode = updateBasedOnAggregates(aggregate, 
columnRefByAggregateCall, incrementalRebuildMode);
 
-  public boolean hasAllowedOperatorsOnly() {
-    return hasAllowedOperatorsOnly;
+    return new Result(incrementalRebuildMode, true, countStarIndex);
   }
 
-  public boolean isInsertAllowedOnly() {
-    return insertAllowedOnly;
+  private IncrementalRebuildMode updateBasedOnAggregates(
+      HiveAggregate aggregate,
+      Map<Integer, Set<SqlKind>> columnRefByAggregateCall,
+      IncrementalRebuildMode incrementalRebuildMode) {
+
+    for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
+      AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
+      switch (aggregateCall.getAggregation().getKind()) {
+        case COUNT:
+          if (aggregateCall.isDistinct() || aggregateCall.isApproximate()) {
+            LOG.debug("Unsupported aggregate function COUNT with distinct {} 
or approximate {}",
+                aggregateCall.isDistinct(), aggregateCall.isApproximate());
+            return NOT_AVAILABLE;
+          }
+
+        case SUM:

Review Comment:
   nit: since these 2 aggs are not changing the supplied 
incrementalRebuildMode, one could remove them here.  But I suppose one 
advantage of keeping them here is to make it explicit that they support 
incremental refresh.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to