----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23270/#review47707 -----------------------------------------------------------
ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java <https://reviews.apache.org/r/23270/#comment83868> What does flush do? ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java <https://reviews.apache.org/r/23270/#comment83867> Why remove this method? The rows in a key group are sorted by tags. If we see a new tag, we can call end group for operators which have smaller tags. Also, the JoinOperator assumes that the rows are sorted by tags. I think we need this method to make sure for the optimized plan, JoinOperator still get rows sorted by tags (within a key group). ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java <https://reviews.apache.org/r/23270/#comment83873> Do we need this? ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java <https://reviews.apache.org/r/23270/#comment83877> Seems the logic at here is used to check if we are processing the last alias of this JoinOperaotr and because endGroupIfNecessary is removed in this patch, rows within a key group may not sorted by tags. I am not sure if this is what we want because the behavior of the JoinOperator when we have an optimized plan may be different from a not optimized plan. I mean the right most table may not be the stream table for a plan optimized by the correlation optimizer. ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java <https://reviews.apache.org/r/23270/#comment83874> Do we need this? ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java <https://reviews.apache.org/r/23270/#comment83872> Do we need this? ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java <https://reviews.apache.org/r/23270/#comment83866> Seems we do not need this line, right? ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java <https://reviews.apache.org/r/23270/#comment83869> Do we need this? ql/src/test/queries/clientpositive/correlationoptimizer16.q <https://reviews.apache.org/r/23270/#comment83870> I think correlationoptimizer8 is for cases with UNION ALL. Can we add test queries in that file? - Yin Huai On July 4, 2014, 12:15 a.m., Navis Ryu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23270/ > ----------------------------------------------------------- > > (Updated July 4, 2014, 12:15 a.m.) > > > Review request for hive. > > > Bugs: HIVE-7205 > https://issues.apache.org/jira/browse/HIVE-7205 > > > Repository: hive-git > > > Description > ------- > > use case : > > table TBL (a string,b string) contains single row : 'a','a' > the following query : > {code:sql} > select b, sum(cc) from ( > select b,count(1) as cc from TBL group by b > union all > select a as b,count(1) as cc from TBL group by a > ) z > group by b > {code} > returns > > a 1 > a 1 > while set hive.optimize.correlation=true; > > if we change set hive.optimize.correlation=false; > it returns correct results : a 2 > > > The plan with correlation optimization : > {code:sql} > ABSTRACT SYNTAX TREE: > (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM > (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR > TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR > (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY > (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION > (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) > (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL > a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT > (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum > (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) > > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-0 is a root stage > > STAGE PLANS: > Stage: Stage-1 > Map Reduce > Alias -> Map Operator Tree: > null-subquery1:z-subquery1:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: b > type: string > outputColumnNames: b > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: b > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: 0 > value expressions: > expr: _col1 > type: bigint > null-subquery2:z-subquery2:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: a > type: string > outputColumnNames: a > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: a > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: 1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Demux Operator > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Mux Operator > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: complete > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Mux Operator > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: complete > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > > Stage: Stage-0 > Fetch Operator > limit: -1 > {code} > > Plan without correlation optimization : > > {code:sql} > ABSTRACT SYNTAX TREE: > (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM > (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR > TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR > (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY > (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION > (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) > (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL > a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT > (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum > (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) > > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-2 depends on stages: Stage-1, Stage-3 > Stage-3 is a root stage > Stage-0 is a root stage > > STAGE PLANS: > Stage: Stage-1 > Map Reduce > Alias -> Map Operator Tree: > null-subquery2:z-subquery2:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: a > type: string > outputColumnNames: a > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: a > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > > Stage: Stage-2 > Map Reduce > Alias -> Map Operator Tree: > > maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10002 > > TableScan > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > > maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10003 > > TableScan > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Group By Operator > aggregations: > expr: sum(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > > Stage: Stage-3 > Map Reduce > Alias -> Map Operator Tree: > null-subquery1:z-subquery1:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: b > type: string > outputColumnNames: b > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: b > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > > Stage: Stage-0 > Fetch Operator > limit: -1 > {code} > > > Diffs > ----- > > ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java 03194a4 > ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java 772dda6 > ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 1dde78e > ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java 792d87f > ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java > 91b2369 > ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java c747099 > ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java e877cd4 > ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b10a7fa > ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java db94271 > ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java a63466a > ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java 58d1638 > ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java > e884afd > > ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java > c52f753 > > ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java > 43458d9 > ql/src/test/queries/clientpositive/correlationoptimizer16.q PRE-CREATION > ql/src/test/results/clientpositive/correlationoptimizer16.q.out > PRE-CREATION > > Diff: https://reviews.apache.org/r/23270/diff/ > > > Testing > ------- > > > Thanks, > > Navis Ryu > >