kasakrisz commented on code in PR #5096:
URL: https://github.com/apache/hive/pull/5096#discussion_r1512175945


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>

Review Comment:
   Fixed.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewIncrementalRewritingRelVisitor.java:
##########
@@ -17,97 +17,210 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.AVAILABLE;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.INSERT_ONLY;
+import static 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.IncrementalRebuildMode.NOT_AVAILABLE;
+
 /**
  * This class is a helper to check whether a materialized view rebuild
  * can be transformed from INSERT OVERWRITE to INSERT INTO.
  *
  * We are verifying that:
- *   1) Plan only uses legal operators (i.e., Filter, Project,
- *   Join, and TableScan)
- *   2) Whether the plane has aggregate
- *   3) Whether the plane has an count(*) aggregate function call
+ * <ul>
+ *   <li>Plan only uses legal operators (i.e., Filter, Project, Join, and 
TableScan)</li>
+ *   <li>Whether the plane has aggregate</li>
+ *   <li>Whether the plane has count(*) aggregate function call</li>
+ *   <li>Check whether aggregate functions are supported</li>
+ * </ul>
  */
-public class MaterializedViewIncrementalRewritingRelVisitor extends RelVisitor 
{
+public class MaterializedViewIncrementalRewritingRelVisitor implements 
ReflectiveVisitor {
 
-  private boolean containsAggregate;
-  private boolean hasAllowedOperatorsOnly;
-  private boolean hasCountStar;
-  private boolean insertAllowedOnly;
+  private final ReflectUtil.MethodDispatcher<Result> dispatcher;
 
   public MaterializedViewIncrementalRewritingRelVisitor() {
-    this.containsAggregate = false;
-    this.hasAllowedOperatorsOnly = true;
-    this.hasCountStar = false;
-    this.insertAllowedOnly = false;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        Result.class, this, "visit", RelNode.class);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public Result go(RelNode relNode) {
+    Result result = dispatcher.invoke(relNode);
+    if (result.containsAggregate) {
+      return result;
+    }
+
+    if (result.incrementalRebuildMode == AVAILABLE) {
+      // Incremental rebuild of non-aggregate MV is not supported when any 
source table has delete operations.
+      return new Result(INSERT_ONLY);
+    }
+
+    return result;
+  }
+
+  public Result visit(RelNode relNode) {
+    // Only TS, Filter, Join, Project and Aggregate are supported
+    return new Result(NOT_AVAILABLE);
+  }
+
+  private Result visitChildOf(RelNode rel) {
+    return visitChildOf(rel, 0);
   }
 
-  @Override
-  public void visit(RelNode node, int ordinal, RelNode parent) {
-    if (node instanceof Aggregate) {
-      this.containsAggregate = true;
-      check((Aggregate) node);
-      super.visit(node, ordinal, parent);
-    } else if (
-            node instanceof Filter ||
-            node instanceof Project ||
-            node instanceof Join) {
-      super.visit(node, ordinal, parent);
-    } else if (node instanceof TableScan) {
-      HiveTableScan scan = (HiveTableScan) node;
-      RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
-      if (hiveTable.getHiveTableMD().getStorageHandler() != null &&
-              
hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
+  private Result visitChildOf(RelNode rel, int index) {
+    return dispatcher.invoke(rel.getInput(index));
+  }
+
+  public Result visit(HiveTableScan scan) {
+    RelOptHiveTable hiveTable = (RelOptHiveTable) scan.getTable();
+
+    if (hiveTable.getHiveTableMD().getStorageHandler() != null) {
+      if 
(hiveTable.getHiveTableMD().getStorageHandler().areSnapshotsSupported()) {
         // Incremental rebuild of materialized views with non-native source 
tables are not implemented
         // when any of the source tables has delete/update operation since the 
last rebuild
-        insertAllowedOnly = true;
+        return new Result(INSERT_ONLY);
+      } else {
+        return new Result(NOT_AVAILABLE);
       }
-    } else {
-      hasAllowedOperatorsOnly = false;
     }
+
+    return new Result(AVAILABLE);
+  }
+
+  public Result visit(HiveProject project) {
+    return visitChildOf(project);
+  }
+
+  public Result visit(HiveFilter filter) {
+    return visitChildOf(filter);
   }
 
-  private void check(Aggregate aggregate) {
+  public Result visit(HiveJoin join) {
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return new Result(NOT_AVAILABLE);
+    }
+
+    Result leftResult = visitChildOf(join, 0);
+    Result rightResult = visitChildOf(join, 1);
+
+    boolean containsAggregate = leftResult.containsAggregate || 
rightResult.containsAggregate;
+    int countStarIndex = leftResult.countIndex;
+    if (countStarIndex == -1 && rightResult.countIndex != -1) {
+      countStarIndex = rightResult.countIndex + 
join.getLeft().getRowType().getFieldCount();
+    }
+    switch (rightResult.incrementalRebuildMode) {
+      case INSERT_ONLY:
+        return new Result(INSERT_ONLY, containsAggregate, countStarIndex);
+      case AVAILABLE:
+        return new Result(
+            leftResult.incrementalRebuildMode == INSERT_ONLY ? INSERT_ONLY : 
AVAILABLE,
+            containsAggregate,
+            countStarIndex);
+      case NOT_AVAILABLE:
+      default:
+        return new Result(rightResult.incrementalRebuildMode, 
containsAggregate, countStarIndex);
+    }
+  }
+
+  public Result visit(HiveAggregate aggregate) {
+    Result result = visitChildOf(aggregate);
+    if (result.incrementalRebuildMode != AVAILABLE) {
+      return new Result(result.incrementalRebuildMode, true, -1);
+    }
+
+    Map<Integer, Set<SqlKind>> columnRefByAggregateCall = new 
HashMap<>(aggregate.getRowType().getFieldCount());
+
+    int countStarIndex = -1;
     for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
       AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
-      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().size() == 0) {
-        hasCountStar = true;
-        break;
+      if (aggregateCall.getAggregation().getKind() == SqlKind.COUNT && 
aggregateCall.getArgList().isEmpty()) {
+        countStarIndex = i;
+        continue;
+      }
+
+      for (Integer argIndex : aggregateCall.getArgList()) {
+        columnRefByAggregateCall.computeIfAbsent(argIndex, integer -> new 
HashSet<>());
+        Set<SqlKind> aggregates = columnRefByAggregateCall.get(argIndex);
+        aggregates.add(aggregateCall.getAggregation().getKind());
       }
     }
-  }
 
-  /**
-   * Starts an iteration.
-   */
-  public RelNode go(RelNode p) {
-    visit(p, 0, null);
-    return p;
-  }
+    IncrementalRebuildMode incrementalRebuildMode = countStarIndex == -1 ? 
INSERT_ONLY : AVAILABLE;
+    for (int i = 0; i < aggregate.getAggCallList().size(); ++i) {
+      AggregateCall aggregateCall = aggregate.getAggCallList().get(i);
+      switch (aggregateCall.getAggregation().getKind()) {
+        case COUNT:
+        case SUM:
+        case SUM0:
+          break;
 
-  public boolean isContainsAggregate() {
-    return containsAggregate;
-  }
+        case AVG:
+          Set<SqlKind> aggregates = 
columnRefByAggregateCall.get(aggregateCall.getArgList().get(0));
+          if (!(aggregates.contains(SqlKind.SUM) && 
aggregates.contains(SqlKind.COUNT))) {
+            incrementalRebuildMode = NOT_AVAILABLE;
+          }
+          break;
 
-  public boolean hasAllowedOperatorsOnly() {
-    return hasAllowedOperatorsOnly;
-  }
+        case MIN:
+        case MAX:
+          incrementalRebuildMode = INSERT_ONLY;
+          break;
+
+        default:
+          incrementalRebuildMode = NOT_AVAILABLE;
+          break;
+      }
+    }
 
-  public boolean isInsertAllowedOnly() {
-    return insertAllowedOnly;
+    return new Result(incrementalRebuildMode, true, countStarIndex);
   }
 
-  public boolean hasCountStar() {
-    return hasCountStar;
+  public static final class Result {
+    private final IncrementalRebuildMode incrementalRebuildMode;
+    private final boolean containsAggregate;
+    private final int countIndex;

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to