zabetak commented on code in PR #4864:
URL: https://github.com/apache/hive/pull/4864#discussion_r1400385552
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,48 @@ public static <T> Set<T>
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
return found;
}
+ /**
+ * Check whether there are more operators in the specified operator tree
branch than the given limit
+ * until a ReduceSinkOperator is reached.
+ * The method traverses the parent operators of the specified root operator
in dept first manner.
+ * @param start root of the operator tree to check
+ * @param opClazz type of operator to track
+ * @param limit maximum allowed number of operator in a branch of the tree
+ * @return true if limit is exceeded false otherwise
Review Comment:
I am not a native English speaker but the Javadoc here along with the name
of the method are a bit misleading.
I would interpret "ops more than ... limit" as `|ops| > limit`; not as
`|ops| >= limit` as it currently is at the moment.
Similarly I would consider that "limit is exceeded" means that is greater
than (`>` ) and not equal.
Can we make the method return true when the number of operator is strictly
greater than the limit? In the production code, where the method is used, I
think things should still work fine if we say
`hasMoreOperatorsThan(grandParent, GroupByOperator.class, 1)`.
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,48 @@ public static <T> Set<T>
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
return found;
}
+ /**
+ * Check whether there are more operators in the specified operator tree
branch than the given limit
+ * until a ReduceSinkOperator is reached.
+ * The method traverses the parent operators of the specified root operator
in dept first manner.
+ * @param start root of the operator tree to check
+ * @param opClazz type of operator to track
+ * @param limit maximum allowed number of operator in a branch of the tree
+ * @return true if limit is exceeded false otherwise
+ * @param <T> type of operator to track
+ */
+ public static <T> boolean hasMoreOperatorsThan(
+ Operator<?> start, Class<T> opClazz, int limit) {
+ return hasMoreOperatorsThan(start, opClazz, limit, new HashSet<>());
+ }
+
+ private static <T> boolean hasMoreOperatorsThan(
+ Operator<?> start, Class<T> opClazz, int limit, Set<Operator<?>>
visited) {
+ if (visited.contains(start)) {
Review Comment:
I don't see where the `visited` set is populated. Is the cycle detection
logic working?
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,37 @@ public static <T> Set<T>
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
return found;
}
+ /**
+ * Check whether there are more operators in the specified operator tree
branch than the given limit
+ * @param start root of the operator tree to check
+ * @param opClazz type of operator to track
+ * @param limit maximum allowed number of operator in a branch of the tree
+ * @return true of limit is exceeded false otherwise
+ * @param <T> type of operator to track
+ */
+ public static <T> boolean hasMoreOperatorsThan(Operator<?> start, Class<T>
opClazz, int limit) {
Review Comment:
Actually, I missed the fact that the method was counting operators in a
certain **path** between nodes. I had the impression that just counting GBY
operators reachable from a starting point was sufficient.
```
RS - GBY1 - GBY2 - FIL1 - \
JOIN
RS - GBY3 - FIL2 - - - - -/
```
Since we are calling the method on the inputs of the join (i.e., FIL1, FIL2)
and not on the join itself I am not sure why we would need to backtrack and
restart counting.
Anyways since we are mostly finalized the discussion on
`hasMoreOperatorsThan` it may not worth jumping into implementing a new method.
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java:
##########
@@ -176,6 +176,37 @@ public static <T> Set<T>
findOperatorsUpstreamJoinAccounted(Operator<?> start, C
return found;
}
+ /**
+ * Check whether there are more operators in the specified operator tree
branch than the given limit
+ * @param start root of the operator tree to check
+ * @param opClazz type of operator to track
+ * @param limit maximum allowed number of operator in a branch of the tree
+ * @return true of limit is exceeded false otherwise
+ * @param <T> type of operator to track
Review Comment:
The `T` variable is used only once in the method declaration and nowhere
else thus it could be replaced with `?` without sacrificing type safety.
Anyways choosing `?` vs `T` (when they mean the same thing) is quite subjective
(https://stackoverflow.com/questions/18142009/type-parameter-vs-unbounded-wildcard)
so I am fine leaving it as it is.
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java:
##########
@@ -755,6 +757,20 @@ private boolean checkConvertJoinSMBJoin(JoinOperator
joinOp, OptimizeTezProcCont
LOG.debug("External table {} found in join and also could not provide
statistics - disabling SMB join.", sb);
return false;
}
+ // GBY operators buffers one record. These are processed when
ReduceRecordSources flushes the operator tree
+ // when end of record stream reached. If the tree has more than two GBY
operators CommonMergeJoinOperator can
+ // not process all buffered records.
+ // HIVE-27788
+ if (parentOp.getParentOperators() != null) {
+ for (Operator<?> grandParent : parentOp.getParentOperators()) {
Review Comment:
Thanks for the clarification. Maybe leave a comment or assertion that
parentOp is always an RS.
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java:
##########
@@ -755,6 +757,20 @@ private boolean checkConvertJoinSMBJoin(JoinOperator
joinOp, OptimizeTezProcCont
LOG.debug("External table {} found in join and also could not provide
statistics - disabling SMB join.", sb);
return false;
}
+ // GBY operators buffers one record. These are processed when
ReduceRecordSources flushes the operator tree
+ // when end of record stream reached. If the tree has more than two GBY
operators CommonMergeJoinOperator can
+ // not process all buffered records.
+ // HIVE-27788
+ if (parentOp.getParentOperators() != null) {
+ for (Operator<?> grandParent : parentOp.getParentOperators()) {
+ if (hasMoreOperatorsThan(grandParent, GroupByOperator.class, 2)) {
+ LOG.info("We cannot convert to SMB join " +
+ "because one of the join branches has more than one Group by
operators in the same reducer.");
Review Comment:
Interestingly the log message says "more than one ..." but the method at the
moment is `hasMoreOperatorsThan(..., 2)`.
##########
ql/src/test/queries/clientpositive/auto_sortmerge_join_17.q:
##########
@@ -0,0 +1,20 @@
+CREATE TABLE tbl1_n5(key int, value string) CLUSTERED BY (key) SORTED BY (key)
INTO 2 BUCKETS;
+
+insert into tbl1_n5(key, value)
+values
+(0, 'val_0'),
+(2, 'val_2'),
+(9, 'val_9');
+
+explain
+SELECT t1.key from
+(SELECT key , row_number() over(partition by key order by value desc) as rk
from tbl1_n5) t1
+join
+( SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key) t2
+on t1.key = t2.key where rk = 1;
+
+SELECT t1.key from
+(SELECT key , row_number() over(partition by key order by value desc) as rk
from tbl1_n5) t1
Review Comment:
I was thinking that QA could be reduced to something like QB and still repro
the problem but I guess I am missing something. If that's not possible then
keep the repro as it is.
```sql
-- QA
SELECT key , row_number() over(partition by key order by value desc) as rk
from tbl1_n5
-- QB
SELECT key FROM tbl1_n5 GROUP BY key
```
##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorUtils.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.*;
+
+class TestOperatorUtils {
+ @Test
+ void testHasMoreGBYsReturnsTrueWhenLimitIs0() {
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+ Operator<?> limit = OperatorFactory.get(context, LimitDesc.class);
+ filter.setParentOperators(singletonList(limit));
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ limit.setParentOperators(singletonList(select));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ select.setParentOperators(singletonList(rs));
+
+ assertTrue(OperatorUtils.hasMoreOperatorsThan(filter,
GroupByOperator.class, 0));
+ }
+
+ @Test
+ void testHasMoreGBYsReturnsFalseWhenNoGBYInBranchAndLimitIsMoreThan0() {
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+ Operator<?> limit = OperatorFactory.get(context, LimitDesc.class);
+ filter.setParentOperators(singletonList(limit));
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ limit.setParentOperators(singletonList(select));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ select.setParentOperators(singletonList(rs));
+
+ assertFalse(OperatorUtils.hasMoreOperatorsThan(filter,
GroupByOperator.class, 2));
+ }
+
+ @Test
+ void testHasMoreGBYsReturnsFalseWhenNumberOfGBYIsLessThanLimit() {
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+ Operator<?> limit = OperatorFactory.get(context, LimitDesc.class);
+ filter.setParentOperators(singletonList(limit));
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ limit.setParentOperators(singletonList(select));
+ Operator<?> gby = OperatorFactory.get(context, GroupByDesc.class);
+ select.setParentOperators(singletonList(gby));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ gby.setParentOperators(singletonList(rs));
+
+ assertFalse(OperatorUtils.hasMoreOperatorsThan(filter,
GroupByOperator.class, 2));
+ }
+
+ @Test
+ void testHasMoreGBYsReturnsTrueWhenNumberOfGBYIsEqualsWithLimit() {
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ gby1.setParentOperators(singletonList(select));
+ Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+ select.setParentOperators(singletonList(filter));
+ Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+ filter.setParentOperators(singletonList(gby2));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ gby2.setParentOperators(singletonList(rs));
+
+ assertTrue(OperatorUtils.hasMoreOperatorsThan(gby1, GroupByOperator.class,
2));
+ }
+
+ @Test
+ void
testHasMoreGBYsReturnsFalseWhenNumberOfGBYIsEqualsWithLimitButHasAnRSInTheMiddle()
{
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ gby1.setParentOperators(singletonList(select));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ select.setParentOperators(singletonList(rs));
+ Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+ rs.setParentOperators(singletonList(gby2));
+ Operator<?> ts = OperatorFactory.get(context, TableScanDesc.class);
+ gby2.setParentOperators(singletonList(ts));
+
+ assertFalse(OperatorUtils.hasMoreOperatorsThan(gby1,
GroupByOperator.class, 2));
+ }
+
+ @Test
+ void
testHasMoreGBYsReturnsTrueWhenBranchHasJoinAndNumberOfGBYIsEqualsWithLimit() {
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+ Operator<?> join = OperatorFactory.get(context, CommonMergeJoinDesc.class);
+ gby1.setParentOperators(singletonList(join));
+
+ // Branch #1 has the second GBY
+ Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+ Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+ filter.setParentOperators(singletonList(gby2));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ gby2.setParentOperators(singletonList(rs));
+
+ // Branch #2
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ Operator<?> rs2 = OperatorFactory.get(context, ReduceSinkDesc.class);
+ select.setParentOperators(singletonList(rs2));
+
+ join.setParentOperators(asList(filter, select));
+
+ assertTrue(OperatorUtils.hasMoreOperatorsThan(gby1, GroupByOperator.class,
2));
+ }
+
+ @Test
+ void
testHasMoreGBYsReturnsFalseWhenBranchHasJoinAndBothJoinBranchesHasLessGBYThanLimit()
{
+ CompilationOpContext context = new CompilationOpContext();
+ Operator<?> join = OperatorFactory.get(context, CommonMergeJoinDesc.class);
+
+ // Branch #1 has the second GBY
+ Operator<?> filter = OperatorFactory.get(context, FilterDesc.class);
+ Operator<?> gby1 = OperatorFactory.get(context, GroupByDesc.class);
+ filter.setParentOperators(singletonList(gby1));
+ Operator<?> rs = OperatorFactory.get(context, ReduceSinkDesc.class);
+ gby1.setParentOperators(singletonList(rs));
+
+ // Branch #2
+ Operator<?> select = OperatorFactory.get(context, SelectDesc.class);
+ Operator<?> gby2 = OperatorFactory.get(context, GroupByDesc.class);
+ select.setParentOperators(singletonList(gby2));
+ Operator<?> rs2 = OperatorFactory.get(context, ReduceSinkDesc.class);
+ gby2.setParentOperators(singletonList(rs2));
+
+ join.setParentOperators(asList(filter, select));
+
+ assertFalse(OperatorUtils.hasMoreOperatorsThan(join,
GroupByOperator.class, 2));
Review Comment:
OK, I missed the fact that we were interested in the number of operators in
root to leaf path only. As I wrote elsewhere, I though we just cared about the
total number of "certain" operators in the sub-tree.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]