zabetak commented on code in PR #4864:
URL: https://github.com/apache/hive/pull/4864#discussion_r1400385552


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,48 @@ public static <T> Set<T> 
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
     return found;
   }
 
+  /**
+   * Check whether there are more operators in the specified operator tree 
branch than the given limit
+   * until a ReduceSinkOperator is reached.
+   * The method traverses the parent operators of the specified root operator 
in dept first manner.
+   * @param start root of the operator tree to check
+   * @param opClazz type of operator to track
+   * @param limit maximum allowed number of operator in a branch of the tree
+   * @return true if limit is exceeded false otherwise

Review Comment:
   I am not a native English speaker but the Javadoc here along with the name 
of the method are a bit misleading.
   
   I would interpret "ops more than ... limit" as `|ops| > limit`; not as 
`|ops| >= limit` as it currently is at the moment.
   Similarly I would consider that "limit is exceeded" means that is greater 
than (`>` ) and not equal. 
   
   Can we make the method return true when the number of operator is strictly 
greater than the limit? In the production code, where the method is used, I 
think things should still work fine if we say 
`hasMoreOperatorsThan(grandParent, GroupByOperator.class, 1)`.



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,48 @@ public static <T> Set<T> 
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
     return found;
   }
 
+  /**
+   * Check whether there are more operators in the specified operator tree 
branch than the given limit
+   * until a ReduceSinkOperator is reached.
+   * The method traverses the parent operators of the specified root operator 
in dept first manner.
+   * @param start root of the operator tree to check
+   * @param opClazz type of operator to track
+   * @param limit maximum allowed number of operator in a branch of the tree
+   * @return true if limit is exceeded false otherwise
+   * @param <T> type of operator to track
+   */
+  public static <T> boolean hasMoreOperatorsThan(
+      Operator<?> start, Class<T> opClazz, int limit) {
+    return hasMoreOperatorsThan(start, opClazz, limit, new HashSet<>());
+  }
+
+  private static <T> boolean hasMoreOperatorsThan(
+      Operator<?> start, Class<T> opClazz, int limit, Set<Operator<?>> 
visited) {
+    if (visited.contains(start)) {

Review Comment:
   I don't see where the `visited` set is populated. Is the cycle detection 
logic working?



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,37 @@ public static <T> Set<T> 
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
     return found;
   }
 
+  /**
+   * Check whether there are more operators in the specified operator tree 
branch than the given limit
+   * @param start root of the operator tree to check
+   * @param opClazz type of operator to track
+   * @param limit maximum allowed number of operator in a branch of the tree
+   * @return true of limit is exceeded false otherwise
+   * @param <T> type of operator to track
+   */
+  public static <T> boolean hasMoreOperatorsThan(Operator<?> start, Class<T> 
opClazz, int limit) {

Review Comment:
   Actually, I missed the fact that the method was counting operators in a 
certain **path** between nodes. I had the impression that just counting GBY 
operators reachable from a starting point was sufficient.
   ```
   RS - GBY1 - GBY2 - FIL1 - \
                             JOIN
   RS - GBY3 - FIL2 - - - - -/
   ```
   Since we are calling the method on the inputs of the join (i.e., FIL1, FIL2) 
and not on the join itself I am not sure why we would need to backtrack and 
restart counting.
   
   Anyways since we are mostly finalized the discussion on 
`hasMoreOperatorsThan` it may not worth jumping into implementing a new method.



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,37 @@ public static <T> Set<T> 
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
     return found;
   }
 
+  /**
+   * Check whether there are more operators in the specified operator tree 
branch than the given limit
+   * @param start root of the operator tree to check
+   * @param opClazz type of operator to track
+   * @param limit maximum allowed number of operator in a branch of the tree
+   * @return true of limit is exceeded false otherwise
+   * @param <T> type of operator to track

Review Comment:
   The `T` variable is used only once in the method declaration and nowhere 
else thus it could be replaced with `?` without sacrificing type safety. 
Anyways choosing `?` vs `T` (when they mean the same thing) is quite subjective
   
(https://stackoverflow.com/questions/18142009/type-parameter-vs-unbounded-wildcard)
 so I am fine leaving it as it is.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java:
##########
@@ -755,6 +757,20 @@ private boolean checkConvertJoinSMBJoin(JoinOperator 
joinOp, OptimizeTezProcCont
         LOG.debug("External table {} found in join and also could not provide 
statistics - disabling SMB join.", sb);
         return false;
       }
+      // GBY operators buffers one record. These are processed when 
ReduceRecordSources flushes the operator tree
+      // when end of record stream reached. If the tree has more than two GBY 
operators CommonMergeJoinOperator can
+      // not process all buffered records.
+      // HIVE-27788
+      if (parentOp.getParentOperators() != null) {
+        for (Operator<?> grandParent : parentOp.getParentOperators()) {

Review Comment:
   Thanks for the clarification. Maybe leave a comment or assertion that 
parentOp is always an RS.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java:
##########
@@ -755,6 +757,20 @@ private boolean checkConvertJoinSMBJoin(JoinOperator 
joinOp, OptimizeTezProcCont
         LOG.debug("External table {} found in join and also could not provide 
statistics - disabling SMB join.", sb);
         return false;
       }
+      // GBY operators buffers one record. These are processed when 
ReduceRecordSources flushes the operator tree
+      // when end of record stream reached. If the tree has more than two GBY 
operators CommonMergeJoinOperator can
+      // not process all buffered records.
+      // HIVE-27788
+      if (parentOp.getParentOperators() != null) {
+        for (Operator<?> grandParent : parentOp.getParentOperators()) {
+          if (hasMoreOperatorsThan(grandParent, GroupByOperator.class, 2)) {
+            LOG.info("We cannot convert to SMB join " +
+                "because one of the join branches has more than one Group by 
operators in the same reducer.");

Review Comment:
   Interestingly the log message says "more than one ..." but the method at the 
moment is `hasMoreOperatorsThan(..., 2)`.



##########
ql/src/test/queries/clientpositive/auto_sortmerge_join_17.q:
##########
@@ -0,0 +1,20 @@
+CREATE TABLE tbl1_n5(key int, value string) CLUSTERED BY (key) SORTED BY (key) 
INTO 2 BUCKETS;
+
+insert into tbl1_n5(key, value)
+values
+(0, 'val_0'),
+(2, 'val_2'),
+(9, 'val_9');
+
+explain
+SELECT t1.key from
+(SELECT  key , row_number() over(partition by key order by value desc) as rk 
from tbl1_n5) t1
+join
+( SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key) t2
+on t1.key = t2.key where rk = 1;
+
+SELECT t1.key from
+(SELECT  key , row_number() over(partition by key order by value desc) as rk 
from tbl1_n5) t1

Review Comment:
   I was thinking that QA could be reduced to something like QB and still repro 
the problem but I guess I am missing something. If that's not possible then 
keep the repro as it is.
   ```sql
   -- QA
   SELECT key , row_number() over(partition by key order by value desc) as rk 
from tbl1_n5
   -- QB
   SELECT key FROM tbl1_n5 GROUP BY key
   ```



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorUtils.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.*;
+
+class TestOperatorUtils {
+  @Test
+  void testHasMoreGBYsReturnsTrueWhenLimitIs0() {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+    Operator<?> limit = OperatorFactory.get(context, LimitDesc.class);
+    filter.setParentOperators(singletonList(limit));
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    limit.setParentOperators(singletonList(select));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    select.setParentOperators(singletonList(rs));
+
+    assertTrue(OperatorUtils.hasMoreOperatorsThan(filter, 
GroupByOperator.class, 0));
+  }
+
+  @Test
+  void testHasMoreGBYsReturnsFalseWhenNoGBYInBranchAndLimitIsMoreThan0() {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+    Operator<?> limit = OperatorFactory.get(context, LimitDesc.class);
+    filter.setParentOperators(singletonList(limit));
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    limit.setParentOperators(singletonList(select));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    select.setParentOperators(singletonList(rs));
+
+    assertFalse(OperatorUtils.hasMoreOperatorsThan(filter, 
GroupByOperator.class, 2));
+  }
+
+  @Test
+  void testHasMoreGBYsReturnsFalseWhenNumberOfGBYIsLessThanLimit() {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+    Operator<?> limit = OperatorFactory.get(context, LimitDesc.class);
+    filter.setParentOperators(singletonList(limit));
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    limit.setParentOperators(singletonList(select));
+    Operator<?> gby = OperatorFactory.get(context, GroupByDesc.class);
+    select.setParentOperators(singletonList(gby));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    gby.setParentOperators(singletonList(rs));
+
+    assertFalse(OperatorUtils.hasMoreOperatorsThan(filter, 
GroupByOperator.class, 2));
+  }
+
+  @Test
+  void testHasMoreGBYsReturnsTrueWhenNumberOfGBYIsEqualsWithLimit() {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    gby1.setParentOperators(singletonList(select));
+    Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+    select.setParentOperators(singletonList(filter));
+    Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+    filter.setParentOperators(singletonList(gby2));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    gby2.setParentOperators(singletonList(rs));
+
+    assertTrue(OperatorUtils.hasMoreOperatorsThan(gby1, GroupByOperator.class, 
2));
+  }
+
+  @Test
+  void 
testHasMoreGBYsReturnsFalseWhenNumberOfGBYIsEqualsWithLimitButHasAnRSInTheMiddle()
 {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    gby1.setParentOperators(singletonList(select));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    select.setParentOperators(singletonList(rs));
+    Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+    rs.setParentOperators(singletonList(gby2));
+    Operator<?> ts = OperatorFactory.get(context, TableScanDesc.class);
+    gby2.setParentOperators(singletonList(ts));
+
+    assertFalse(OperatorUtils.hasMoreOperatorsThan(gby1, 
GroupByOperator.class, 2));
+  }
+
+  @Test
+  void 
testHasMoreGBYsReturnsTrueWhenBranchHasJoinAndNumberOfGBYIsEqualsWithLimit() {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+    Operator<?> join = OperatorFactory.get(context, CommonMergeJoinDesc.class);
+    gby1.setParentOperators(singletonList(join));
+
+    // Branch #1 has the second GBY
+    Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+    Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+    filter.setParentOperators(singletonList(gby2));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    gby2.setParentOperators(singletonList(rs));
+
+    // Branch #2
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    Operator<?> rs2 = OperatorFactory.get(context, ReduceSinkDesc.class);
+    select.setParentOperators(singletonList(rs2));
+
+    join.setParentOperators(asList(filter, select));
+
+    assertTrue(OperatorUtils.hasMoreOperatorsThan(gby1, GroupByOperator.class, 
2));
+  }
+
+  @Test
+  void 
testHasMoreGBYsReturnsFalseWhenBranchHasJoinAndBothJoinBranchesHasLessGBYThanLimit()
 {
+    CompilationOpContext context = new CompilationOpContext();
+    Operator<?> join = OperatorFactory.get(context, CommonMergeJoinDesc.class);
+
+    // Branch #1 has the second GBY
+    Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+    Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+    filter.setParentOperators(singletonList(gby1));
+    Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+    gby1.setParentOperators(singletonList(rs));
+
+    // Branch #2
+    Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+    Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+    select.setParentOperators(singletonList(gby2));
+    Operator<?> rs2 = OperatorFactory.get(context, ReduceSinkDesc.class);
+    gby2.setParentOperators(singletonList(rs2));
+
+    join.setParentOperators(asList(filter, select));
+
+    assertFalse(OperatorUtils.hasMoreOperatorsThan(join, 
GroupByOperator.class, 2));

Review Comment:
   OK, I missed the fact that we were interested in the number of operators in 
root to leaf path only. As I wrote elsewhere, I though we just cared about the 
total number of "certain" operators in the sub-tree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to