HIVE-20703: Put dynamic sort partition optimization under cost based decision (Vineet Garg, reviewed by Prasanth Jayachandran, Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5eebbdf7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5eebbdf7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5eebbdf7 Branch: refs/heads/master Commit: 5eebbdf7c5750b31e1c43fe576fc0ab728bce05c Parents: 0d39a81 Author: Vineet Garg <[email protected]> Authored: Sun Oct 21 12:45:04 2018 -0700 Committer: Vineet Garg <[email protected]> Committed: Sun Oct 21 12:45:04 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 10 + .../insert_into_dynamic_partitions.q.out | 126 +- .../insert_overwrite_dynamic_partitions.q.out | 126 +- .../clientpositive/orc_format_part.q.out | 14 +- .../orc_nonstd_partitions_loc.q.out | 8 +- .../clientpositive/parquet_format_part.q.out | 14 +- .../parquet_nonstd_partitions_loc.q.out | 8 +- .../clientpositive/rcfile_format_part.q.out | 14 +- .../rcfile_nonstd_partitions_loc.q.out | 6 +- .../jdbc/TestTriggersTezSessionPoolManager.java | 4 + .../apache/hadoop/hive/ql/exec/MemoryInfo.java | 114 + .../hadoop/hive/ql/optimizer/Optimizer.java | 7 +- .../optimizer/SortedDynPartitionOptimizer.java | 112 +- .../physical/LlapClusterStateForCompile.java | 12 + .../hadoop/hive/ql/parse/TezCompiler.java | 24 + .../test/queries/clientpositive/dp_counter_mm.q | 1 + .../queries/clientpositive/dp_counter_non_mm.q | 1 + .../dynpart_sort_opt_vectorization.q | 25 +- .../clientpositive/dynpart_sort_optimization.q | 63 +- .../clientpositive/dynpart_sort_optimization2.q | 9 +- .../dynpart_sort_optimization_acid.q | 12 +- .../dynpart_sort_optimization_acid2.q | 2 +- .../clientpositive/load_data_using_job.q | 2 + .../queries/clientpositive/tez_input_counters.q | 1 + .../vector_partitioned_date_time.q | 3 +- .../clientpositive/acid_table_stats.q.out | 16 +- .../dynpart_sort_optimization_acid2.q.out | 88 +- .../llap/auto_sortmerge_join_16.q.out | 32 +- .../llap/dynpart_sort_opt_vectorization.q.out | 44 +- .../llap/dynpart_sort_optimization.q.out | 489 ++- .../llap/dynpart_sort_optimization2.q.out | 8 +- .../llap/dynpart_sort_optimization_acid.q.out | 26 +- .../extrapolate_part_stats_partial_ndv.q.out | 8 +- .../results/clientpositive/llap/lineage3.q.out | 2 +- .../clientpositive/llap/llap_partitioned.q.out | 8 +- .../results/clientpositive/llap/llap_smb.q.out | 14 +- .../clientpositive/llap/llap_stats.q.out | 4 +- .../llap/load_data_using_job.q.out | 3624 +++++++++--------- .../clientpositive/llap/load_dyn_part5.q.out | 60 +- .../llap/materialized_view_partitioned_3.q.out | 2 - .../results/clientpositive/llap/mm_dp.q.out | 4 +- .../clientpositive/llap/orc_analyze.q.out | 6 +- .../clientpositive/llap/orc_merge1.q.out | 187 +- .../clientpositive/llap/orc_merge10.q.out | 338 +- .../clientpositive/llap/orc_merge2.q.out | 60 +- .../clientpositive/llap/orc_merge7.q.out | 108 +- .../clientpositive/llap/orc_merge_diff_fs.q.out | 176 +- .../llap/orc_merge_incompat2.q.out | 54 +- .../clientpositive/llap/subquery_notin.q.out | 24 +- .../results/clientpositive/llap/tez_dml.q.out | 60 +- .../llap/vector_count_distinct.q.out | 18 +- .../llap/vector_partitioned_date_time.q.out | 2730 ++++++------- 52 files changed, 4726 insertions(+), 4182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ab86200..bcf1e9e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2230,6 +2230,16 @@ public class HiveConf extends Configuration { "When enabled dynamic partitioning column will be globally sorted.\n" + "This way we can keep only one record writer open for each partition value\n" + "in the reducer thereby reducing the memory pressure on reducers."), + HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD("hive.optimize.sort.dynamic.partition.threshold", 0, + "When enabled dynamic partitioning column will be globally sorted.\n" + + "This way we can keep only one record writer open for each partition value\n" + + "in the reducer thereby reducing the memory pressure on reducers.\n" + + "This config has following possible values: \n" + + "\t-1 - This completely disables the optimization. \n" + + "\t1 - This always enable the optimization. \n" + + "\t0 - This makes the optimization a cost based decision. \n" + + "Setting it to any other positive integer will make Hive use this as threshold for number of writers."), + HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, "Uses sampling on order-by clause for parallel execution."), HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, "Total number of samples to be obtained."), http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/insert_into_dynamic_partitions.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/insert_into_dynamic_partitions.q.out b/itests/hive-blobstore/src/test/results/clientpositive/insert_into_dynamic_partitions.q.out index b42f966..80dbbee 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/insert_into_dynamic_partitions.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/insert_into_dynamic_partitions.q.out @@ -84,7 +84,8 @@ POSTHOOK: Input: _dummy_database@_dummy_table STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 - Stage-2 depends on stages: Stage-0 + Stage-2 depends on stages: Stage-0, Stage-3 + Stage-3 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 @@ -107,13 +108,12 @@ STAGE PLANS: outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - key expressions: _col1 (type: string), _bucket_number (type: string) - null sort order: aa - sort order: ++ - Map-reduce partition columns: _col1 (type: string) + null sort order: + sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE tag: -1 - value expressions: _col0 (type: int) + value expressions: _col0 (type: int), _col1 (type: string) auto parallelism: false Path -> Alias: #### A masked pattern was here #### @@ -159,16 +159,15 @@ STAGE PLANS: Needs Tagging: false Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: int), KEY._col1 (type: string), KEY._bucket_number (type: string) - outputColumnNames: _col0, _col1, _bucket_number - Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false GlobalTableId: 1 directory: ### BLOBSTORE_STAGING_PATH ### - Dp Sort State: PARTITION_BUCKET_SORTED NumFilesPerFileSink: 1 - Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Stats Publishing Key Prefix: ### BLOBSTORE_STAGING_PATH ### table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -195,6 +194,34 @@ STAGE PLANS: TotalFiles: 1 GatherStats: true MultiFileSpray: false + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: id, key + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: compute_stats(id, 'hll') + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 608 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + column.name.delimiter , + columns _col0,_col1 + columns.types string,struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary> + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false Stage: Stage-0 Move Operator @@ -236,6 +263,83 @@ STAGE PLANS: Table: default.table1 Is Table Level Stats: false + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 608 Basic stats: COMPLETE Column stats: COMPLETE + tag: -1 + value expressions: _col1 (type: struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>) + auto parallelism: false + Execution mode: vectorized + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + column.name.delimiter , + columns _col0,_col1 + columns.types string,struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary> + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + column.name.delimiter , + columns _col0,_col1 + columns.types string,struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary> + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: compute_stats(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 624 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 624 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 624 Basic stats: COMPLETE Column stats: COMPLETE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>:string + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + PREHOOK: query: DROP TABLE table1 PREHOOK: type: DROPTABLE PREHOOK: Input: default@table1 http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/insert_overwrite_dynamic_partitions.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/insert_overwrite_dynamic_partitions.q.out b/itests/hive-blobstore/src/test/results/clientpositive/insert_overwrite_dynamic_partitions.q.out index 13d64fb..a8cdc8f 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/insert_overwrite_dynamic_partitions.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/insert_overwrite_dynamic_partitions.q.out @@ -102,7 +102,8 @@ POSTHOOK: Input: _dummy_database@_dummy_table STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 - Stage-2 depends on stages: Stage-0 + Stage-2 depends on stages: Stage-0, Stage-3 + Stage-3 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 @@ -125,13 +126,12 @@ STAGE PLANS: outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - key expressions: _col1 (type: string), _bucket_number (type: string) - null sort order: aa - sort order: ++ - Map-reduce partition columns: _col1 (type: string) + null sort order: + sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE tag: -1 - value expressions: _col0 (type: int) + value expressions: _col0 (type: int), _col1 (type: string) auto parallelism: false Path -> Alias: #### A masked pattern was here #### @@ -177,16 +177,15 @@ STAGE PLANS: Needs Tagging: false Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: int), KEY._col1 (type: string), KEY._bucket_number (type: string) - outputColumnNames: _col0, _col1, _bucket_number - Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false GlobalTableId: 1 directory: ### BLOBSTORE_STAGING_PATH ### - Dp Sort State: PARTITION_BUCKET_SORTED NumFilesPerFileSink: 1 - Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Stats Publishing Key Prefix: ### BLOBSTORE_STAGING_PATH ### table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -213,6 +212,34 @@ STAGE PLANS: TotalFiles: 1 GatherStats: true MultiFileSpray: false + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: id, key + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: compute_stats(id, 'hll') + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 608 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + column.name.delimiter , + columns _col0,_col1 + columns.types string,struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary> + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false Stage: Stage-0 Move Operator @@ -254,6 +281,83 @@ STAGE PLANS: Table: default.table1 Is Table Level Stats: false + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 608 Basic stats: COMPLETE Column stats: COMPLETE + tag: -1 + value expressions: _col1 (type: struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>) + auto parallelism: false + Execution mode: vectorized + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + column.name.delimiter , + columns _col0,_col1 + columns.types string,struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary> + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + column.name.delimiter , + columns _col0,_col1 + columns.types string,struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary> + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: compute_stats(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 624 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 624 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 624 Basic stats: COMPLETE Column stats: COMPLETE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>:string + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + PREHOOK: query: DROP TABLE table1 PREHOOK: type: DROPTABLE PREHOOK: Input: default@table1 http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/orc_format_part.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/orc_format_part.q.out b/itests/hive-blobstore/src/test/results/clientpositive/orc_format_part.q.out index 7b25613..826fae9 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/orc_format_part.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/orc_format_part.q.out @@ -143,7 +143,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -200 +0 PREHOOK: query: SELECT COUNT(*) FROM orc_events WHERE run_date=20120921 PREHOOK: type: QUERY PREHOOK: Input: default@orc_events @@ -152,7 +152,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events WHERE run_date=20120921 POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -50 +0 PREHOOK: query: SELECT COUNT(*) FROM orc_events WHERE run_date=20121121 PREHOOK: type: QUERY PREHOOK: Input: default@orc_events @@ -161,7 +161,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events WHERE run_date=20121121 POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -100 +0 PREHOOK: query: INSERT OVERWRITE TABLE orc_events PARTITION (run_date=201211, game_id, event_name) SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid,game_id,event_name FROM src_events WHERE SUBSTR(run_date,1,6)='201211' @@ -200,7 +200,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -300 +0 PREHOOK: query: INSERT INTO TABLE orc_events PARTITION (run_date=201209, game_id=39, event_name) SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid,event_name FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 @@ -229,7 +229,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -350 +0 PREHOOK: query: INSERT INTO TABLE orc_events PARTITION (run_date=201209, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 AND event_name='hq_change' @@ -258,7 +258,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -400 +50 PREHOOK: query: INSERT OVERWRITE TABLE orc_events PARTITION (run_date=201209, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 AND event_name='hq_change' @@ -287,4 +287,4 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -350 +50 http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/orc_nonstd_partitions_loc.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/orc_nonstd_partitions_loc.q.out b/itests/hive-blobstore/src/test/results/clientpositive/orc_nonstd_partitions_loc.q.out index 1201ce2..bb63070 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/orc_nonstd_partitions_loc.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/orc_nonstd_partitions_loc.q.out @@ -143,7 +143,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -200 +0 PREHOOK: query: ALTER TABLE orc_events ADD PARTITION (run_date=201211, game_id=39, event_name='hq_change') #### A masked pattern was here #### PREHOOK: type: ALTERTABLE_ADDPARTS @@ -193,7 +193,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -300 +100 PREHOOK: query: INSERT INTO TABLE orc_events PARTITION (run_date=201211, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201211' @@ -232,7 +232,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -400 +200 PREHOOK: query: ALTER TABLE orc_events ADD PARTITION (run_date=201209, game_id=39, event_name='hq_change') #### A masked pattern was here #### PREHOOK: type: ALTERTABLE_ADDPARTS @@ -303,7 +303,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM orc_events POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_events #### A masked pattern was here #### -500 +300 PREHOOK: query: INSERT OVERWRITE TABLE orc_events PARTITION (run_date, game_id, event_name) SELECT * FROM src_events PREHOOK: type: QUERY http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/parquet_format_part.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/parquet_format_part.q.out b/itests/hive-blobstore/src/test/results/clientpositive/parquet_format_part.q.out index 0931e3d..7758dc9 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/parquet_format_part.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/parquet_format_part.q.out @@ -143,7 +143,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -200 +0 PREHOOK: query: SELECT COUNT(*) FROM parquet_events WHERE run_date=20120921 PREHOOK: type: QUERY PREHOOK: Input: default@parquet_events @@ -152,7 +152,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events WHERE run_date=20120921 POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -50 +0 PREHOOK: query: SELECT COUNT(*) FROM parquet_events WHERE run_date=20121121 PREHOOK: type: QUERY PREHOOK: Input: default@parquet_events @@ -161,7 +161,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events WHERE run_date=20121121 POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -100 +0 PREHOOK: query: INSERT OVERWRITE TABLE parquet_events PARTITION (run_date=201211, game_id, event_name) SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid,game_id,event_name FROM src_events WHERE SUBSTR(run_date,1,6)='201211' @@ -200,7 +200,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -300 +0 PREHOOK: query: INSERT INTO TABLE parquet_events PARTITION (run_date=201209, game_id=39, event_name) SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid,event_name FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 @@ -229,7 +229,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -350 +0 PREHOOK: query: INSERT INTO TABLE parquet_events PARTITION (run_date=201209, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 AND event_name='hq_change' @@ -258,7 +258,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -400 +50 PREHOOK: query: INSERT OVERWRITE TABLE parquet_events PARTITION (run_date=201209, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 AND event_name='hq_change' @@ -287,4 +287,4 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -350 +50 http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/parquet_nonstd_partitions_loc.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/parquet_nonstd_partitions_loc.q.out b/itests/hive-blobstore/src/test/results/clientpositive/parquet_nonstd_partitions_loc.q.out index 15ae3d9..0ccd0e4 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/parquet_nonstd_partitions_loc.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/parquet_nonstd_partitions_loc.q.out @@ -143,7 +143,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -200 +0 PREHOOK: query: ALTER TABLE parquet_events ADD PARTITION (run_date=201211, game_id=39, event_name='hq_change') #### A masked pattern was here #### PREHOOK: type: ALTERTABLE_ADDPARTS @@ -193,7 +193,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -300 +100 PREHOOK: query: INSERT INTO TABLE parquet_events PARTITION (run_date=201211, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201211' @@ -232,7 +232,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -400 +200 PREHOOK: query: ALTER TABLE parquet_events ADD PARTITION (run_date=201209, game_id=39, event_name='hq_change') #### A masked pattern was here #### PREHOOK: type: ALTERTABLE_ADDPARTS @@ -303,7 +303,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM parquet_events POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet_events #### A masked pattern was here #### -500 +300 PREHOOK: query: INSERT OVERWRITE TABLE parquet_events PARTITION (run_date, game_id, event_name) SELECT * FROM src_events PREHOOK: type: QUERY http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/rcfile_format_part.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/rcfile_format_part.q.out b/itests/hive-blobstore/src/test/results/clientpositive/rcfile_format_part.q.out index 24fc525..340791a 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/rcfile_format_part.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/rcfile_format_part.q.out @@ -143,7 +143,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -200 +0 PREHOOK: query: SELECT COUNT(*) FROM rcfile_events WHERE run_date=20120921 PREHOOK: type: QUERY PREHOOK: Input: default@rcfile_events @@ -152,7 +152,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events WHERE run_date=20120921 POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -50 +0 PREHOOK: query: SELECT COUNT(*) FROM rcfile_events WHERE run_date=20121121 PREHOOK: type: QUERY PREHOOK: Input: default@rcfile_events @@ -161,7 +161,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events WHERE run_date=20121121 POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -100 +0 PREHOOK: query: INSERT OVERWRITE TABLE rcfile_events PARTITION (run_date=201211, game_id, event_name) SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid,game_id,event_name FROM src_events WHERE SUBSTR(run_date,1,6)='201211' @@ -200,7 +200,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -300 +0 PREHOOK: query: INSERT INTO TABLE rcfile_events PARTITION (run_date=201209, game_id=39, event_name) SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid,event_name FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 @@ -229,7 +229,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -350 +0 PREHOOK: query: INSERT INTO TABLE rcfile_events PARTITION (run_date=201209, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 AND event_name='hq_change' @@ -258,7 +258,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -400 +50 PREHOOK: query: INSERT OVERWRITE TABLE rcfile_events PARTITION (run_date=201209, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201209' AND game_id=39 AND event_name='hq_change' @@ -287,4 +287,4 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -350 +50 http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-blobstore/src/test/results/clientpositive/rcfile_nonstd_partitions_loc.q.out ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/results/clientpositive/rcfile_nonstd_partitions_loc.q.out b/itests/hive-blobstore/src/test/results/clientpositive/rcfile_nonstd_partitions_loc.q.out index 6bcfe41..1608422 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/rcfile_nonstd_partitions_loc.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/rcfile_nonstd_partitions_loc.q.out @@ -143,7 +143,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -200 +0 PREHOOK: query: ALTER TABLE rcfile_events ADD PARTITION (run_date=201211, game_id=39, event_name='hq_change') #### A masked pattern was here #### PREHOOK: type: ALTERTABLE_ADDPARTS @@ -193,7 +193,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -300 +100 PREHOOK: query: INSERT INTO TABLE rcfile_events PARTITION (run_date=201211, game_id=39, event_name='hq_change') SELECT log_id,`time`,uid,user_id,type,event_data,session_id,full_uid FROM src_events WHERE SUBSTR(run_date,1,6)='201211' @@ -232,7 +232,7 @@ POSTHOOK: query: SELECT COUNT(*) FROM rcfile_events POSTHOOK: type: QUERY POSTHOOK: Input: default@rcfile_events #### A masked pattern was here #### -400 +200 PREHOOK: query: ALTER TABLE rcfile_events ADD PARTITION (run_date=201209, game_id=39, event_name='hq_change') #### A masked pattern was here #### PREHOOK: type: ALTERTABLE_ADDPARTS http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index afea596..faab11a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -169,6 +169,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest List<String> cmds = getConfigs(); cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.optimize.sort.dynamic.partition.threshold=-1"); Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); @@ -193,6 +194,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest cmds.add("create table src2 (key int) partitioned by (value string)"); cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.optimize.sort.dynamic.partition.threshold=-1"); // query will get cancelled before creating 57 partitions String query = "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; @@ -237,6 +239,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest cmds.add("create table src3 (key int) partitioned by (value string)"); cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.optimize.sort.dynamic.partition.threshold=-1"); String query = "from " + tableName + " insert overwrite table src2 partition (value) select * where under_col < 100 " + @@ -254,6 +257,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest cmds.add("create table src2 (key int) partitioned by (value string)"); cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.optimize.sort.dynamic.partition.threshold=-1"); // query will get cancelled before creating 57 partitions String query = "insert overwrite table src2 partition (value) " + http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryInfo.java new file mode 100644 index 0000000..4c91c49 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +/** + * Contains information about executor memory, various memory thresholds used for join conversions etc. based on + * execution engine. + **/ + +public class MemoryInfo { + + private Configuration conf; + private boolean isTez; + private boolean isLlap; + private long maxExecutorMemory; + private long mapJoinMemoryThreshold; + private long dynPartJoinMemoryThreshold; + + public MemoryInfo(Configuration conf) { + this.isTez = "tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)); + this.isLlap = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + if (isLlap) { + LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf); + llapInfo.initClusterInfo(); + if (llapInfo.hasClusterInfo()) { + this.maxExecutorMemory = llapInfo.getMemoryPerExecutor(); + } else { + long memPerInstance = + HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024L * 1024L; + long numExecutors = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + this.maxExecutorMemory = memPerInstance / numExecutors; + } + } else { + if (isTez) { + float heapFraction = HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION); + int containerSizeMb = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : + conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); + // this can happen when config is explicitly set to "-1", in which case defaultValue also does not work + if (containerSizeMb < 0) { + containerSizeMb = MRJobConfig.DEFAULT_MAP_MEMORY_MB; + } + this.maxExecutorMemory = (long) ((containerSizeMb * 1024L * 1024L) * heapFraction); + } else { + this.maxExecutorMemory = + conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB) * 1024L * 1024L; + // this can happen when config is explicitly set to "-1", in which case defaultValue also does not work + if (maxExecutorMemory < 0) { + maxExecutorMemory = MRJobConfig.DEFAULT_MAP_MEMORY_MB * 1024L * 1024L; + } + } + } + } + + public Configuration getConf() { + return conf; + } + + public void setConf(final Configuration conf) { + this.conf = conf; + } + + public boolean isTez() { + return isTez; + } + + public boolean isLlap() { + return isLlap; + } + + public long getMaxExecutorMemory() { + return maxExecutorMemory; + } + + public long getMapJoinMemoryThreshold() { + return mapJoinMemoryThreshold; + } + + public long getDynPartJoinMemoryThreshold() { + return dynPartJoinMemoryThreshold; + } + + @Override + public String toString() { + return "MEMORY INFO - { isTez: " + isTez() + + ", isLlap: " + isLlap() + + ", maxExecutorMemory: " + LlapUtil.humanReadableByteCount(getMaxExecutorMemory()) + + ", mapJoinMemoryThreshold: "+ LlapUtil.humanReadableByteCount(getMapJoinMemoryThreshold()) + + ", dynPartJoinMemoryThreshold: " + LlapUtil.humanReadableByteCount(getDynPartJoinMemoryThreshold()) + + " }"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 71f7380..25e9cd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -115,12 +115,7 @@ public class Optimizer { transformations.add(new ConstantPropagate()); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) && - HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") && - HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) && - !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { - transformations.add(new SortedDynPartitionOptimizer()); - } + transformations.add(new SortedDynPartitionTimeGranularityOptimizer()); http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 314b8b4..4d9963a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -27,11 +27,13 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.MemoryInfo; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -56,6 +58,7 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -70,6 +73,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.orc.OrcConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +90,7 @@ import com.google.common.collect.Sets; public class SortedDynPartitionOptimizer extends Transform { private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number"; + @Override public ParseContext transform(ParseContext pCtx) throws SemanticException { @@ -159,6 +164,12 @@ public class SortedDynPartitionOptimizer extends Transform { return null; } + DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx(); + List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema()); + + if (!shouldDo(partitionPositions, fsParent)) { + return null; + } // if RS is inserted by enforce bucketing or sorting, we need to remove it // since ReduceSinkDeDuplication will not merge them to single RS. // RS inserted by enforce bucketing/sorting will have bucketing column in @@ -175,13 +186,13 @@ public class SortedDynPartitionOptimizer extends Transform { // unlink connection between FS and its parent fsParent = fsOp.getParentOperators().get(0); + fsParent.getChildOperators().clear(); - DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx(); - int numBuckets = destTable.getNumBuckets(); // if enforce bucketing/sorting is disabled numBuckets will not be set. // set the number of buckets here to ensure creation of empty buckets + int numBuckets = destTable.getNumBuckets(); dpCtx.setNumBuckets(numBuckets); // Get the positions for partition, bucket and sort columns @@ -206,7 +217,7 @@ public class SortedDynPartitionOptimizer extends Transform { * which extracts bucketId from it * see {@link org.apache.hadoop.hive.ql.udf.UDFToInteger#evaluate(RecordIdentifier)}*/ ColumnInfo ci = fsParent.getSchema().getSignature().get(0); - if(!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) { + if (!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) { throw new IllegalStateException("expected 1st column to be ROW__ID but got wrong type: " + ci.toString()); } //add a cast(ROW__ID as int) to wrap in UDFToInteger() @@ -230,10 +241,15 @@ public class SortedDynPartitionOptimizer extends Transform { sortNullOrder.add(order == 1 ? 0 : 1); // for asc, nulls first; for desc, nulls last } LOG.debug("Got sort order"); - for (int i : sortPositions) LOG.debug("sort position " + i); - for (int i : sortOrder) LOG.debug("sort order " + i); - for (int i : sortNullOrder) LOG.debug("sort null order " + i); - List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema()); + for (int i : sortPositions) { + LOG.debug("sort position " + i); + } + for (int i : sortOrder) { + LOG.debug("sort order " + i); + } + for (int i : sortNullOrder) { + LOG.debug("sort null order " + i); + } // update file sink descriptor fsOp.getConf().setMultiFileSpray(false); @@ -248,7 +264,8 @@ public class SortedDynPartitionOptimizer extends Transform { // Create ReduceSink operator ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, sortNullOrder, - allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); + allRSCols, bucketColumns, numBuckets, + fsParent, fsOp.getConf().getWriteType()); List<ExprNodeDesc> descs = new ArrayList<ExprNodeDesc>(allRSCols.size()); List<String> colNames = new ArrayList<String>(); @@ -265,9 +282,11 @@ public class SortedDynPartitionOptimizer extends Transform { } RowSchema selRS = new RowSchema(fsParent.getSchema()); if (!bucketColumns.isEmpty()) { - descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, ReduceField.KEY.toString()+"."+BUCKET_NUMBER_COL_NAME, null, false)); + descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, + ReduceField.KEY.toString()+"."+BUCKET_NUMBER_COL_NAME, null, false)); colNames.add(BUCKET_NUMBER_COL_NAME); - ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, selRS.getSignature().get(0).getTabAlias(), true, true); + ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, + selRS.getSignature().get(0).getTabAlias(), true, true); selRS.getSignature().add(ci); fsParent.getSchema().getSignature().add(ci); } @@ -290,7 +309,7 @@ public class SortedDynPartitionOptimizer extends Transform { } // update partition column info in FS descriptor - fsOp.getConf().setPartitionCols( rsOp.getConf().getPartitionCols()); + fsOp.getConf().setPartitionCols(rsOp.getConf().getPartitionCols()); LOG.info("Inserted " + rsOp.getOperatorId() + " and " + selOp.getOperatorId() + " as parent of " + fsOp.getOperatorId() + " and child of " + fsParent.getOperatorId()); @@ -478,7 +497,7 @@ public class SortedDynPartitionOptimizer extends Transform { String orderStr = ""; for (Integer i : newSortOrder) { - if(i == 1) { + if (i == 1) { orderStr += "+"; } else { orderStr += "-"; @@ -502,7 +521,7 @@ public class SortedDynPartitionOptimizer extends Transform { String nullOrderStr = ""; for (Integer i : newSortNullOrder) { - if(i == 0) { + if (i == 0) { nullOrderStr += "a"; } else { nullOrderStr += "z"; @@ -517,7 +536,7 @@ public class SortedDynPartitionOptimizer extends Transform { for (Integer idx : keyColsPosInVal) { if (idx < 0) { ExprNodeDesc bucketNumColUDF = ExprNodeGenericFuncDesc.newInstance( - FunctionRegistry.getFunctionInfo("bucket_number").getGenericUDF(), new ArrayList<>()); + FunctionRegistry.getFunctionInfo("bucket_number").getGenericUDF(), new ArrayList<>()); keyCols.add(bucketNumColUDF); colExprMap.put(Utilities.ReduceField.KEY + "." +BUCKET_NUMBER_COL_NAME, bucketNumColUDF); } else { @@ -598,7 +617,8 @@ public class SortedDynPartitionOptimizer extends Transform { } /** - * Get the sort positions for the sort columns + * Get the sort positions for the sort columns. + * * @param tabSortCols * @param tabCols * @return @@ -620,7 +640,8 @@ public class SortedDynPartitionOptimizer extends Transform { } /** - * Get the sort order for the sort columns + * Get the sort order for the sort columns. + * * @param tabSortCols * @param tabCols * @return @@ -652,6 +673,63 @@ public class SortedDynPartitionOptimizer extends Transform { return cols; } - } + // the idea is to estimate how many number of writers this insert can spun up. + // Writers are proportional to number of partitions being inserted i.e cardinality of the partition columns + // if these writers are less than number of writers allowed within the memory pool (estimated) we go ahead with + // adding extra RS + // The way max number of writers allowed are computed based on + // (executor/container memory) * (percentage of memory taken by orc) + // and dividing that by max memory (stripe size) taken by a single writer. + private boolean shouldDo(List<Integer> partitionPos, Operator<? extends OperatorDesc> fsParent) { + + int threshold = HiveConf.getIntVar(this.parseCtx.getConf(), + HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD); + long MAX_WRITERS = -1; + + switch (threshold) { + case -1: + return false; + case 0: + break; + case 1: + return true; + default: + MAX_WRITERS = threshold; + break; + } + + List<ColStatistics> colStats = fsParent.getStatistics().getColumnStats(); + if (colStats == null || colStats.isEmpty()) { + return true; + } + long partCardinality = 1; + + // compute cardinality for partition columns + for (Integer idx : partitionPos) { + ColumnInfo ci = fsParent.getSchema().getSignature().get(idx); + ColStatistics partStats = fsParent.getStatistics().getColumnStatisticsFromColName(ci.getInternalName()); + if (partStats == null) { + // statistics for this partition are for some reason not available + return true; + } + partCardinality = partCardinality * partStats.getCountDistint(); + } + + if (MAX_WRITERS < 0) { + double orcMemPool = this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(), + (Double) OrcConf.MEMORY_POOL.getDefaultValue()); + long orcStripSize = this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), + (Long) OrcConf.STRIPE_SIZE.getDefaultValue()); + MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf()); + LOG.debug("Memory info during SDPO opt: {}", memoryInfo); + long executorMem = memoryInfo.getMaxExecutorMemory(); + MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize; + } + if (partCardinality <= MAX_WRITERS) { + return false; + } + return true; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index 0d1990a..a15bdd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -44,6 +44,7 @@ public class LlapClusterStateForCompile { private volatile Long lastClusterUpdateNs; private volatile Integer noConfigNodeCount, executorCount; private volatile int numExecutorsPerNode = -1; + private volatile long memoryPerInstance = -1; private LlapRegistryService svc; private final Configuration conf; private final long updateIntervalNs; @@ -93,6 +94,14 @@ public class LlapClusterStateForCompile { return numExecutorsPerNode; } + public long getMemoryPerInstance() { + return memoryPerInstance; + } + + public long getMemoryPerExecutor() { + return getMemoryPerInstance() / getNumExecutorsPerNode(); + } + private boolean isUpdateNeeded() { Long lastUpdateLocal = lastClusterUpdateNs; if (lastUpdateLocal == null) return true; @@ -134,6 +143,9 @@ public class LlapClusterStateForCompile { if (numExecutorsPerNode == -1) { numExecutorsPerNode = numExecutors; } + if (memoryPerInstance == -1) { + memoryPerInstance = si.getResource().getMemorySize() * 1024L * 1024L; + } } catch (NumberFormatException e) { ++noConfigNodesLocal; } http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index fdc9635..91d2f1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -77,11 +77,14 @@ import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcCtx.ConstantProp import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; import org.apache.hadoop.hive.ql.optimizer.MergeJoinProc; +import org.apache.hadoop.hive.ql.optimizer.NonBlockingOpDeDupProc; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize; import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; +import org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer; import org.apache.hadoop.hive.ql.optimizer.TopNKeyProcessor; +import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication; import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; @@ -170,6 +173,27 @@ public class TezCompiler extends TaskCompiler { // Update bucketing version of ReduceSinkOp if needed updateBucketingVersionForUpgrade(procCtx); + // run Sorted dynamic partition optimization + if(HiveConf.getBoolVar(procCtx.conf, HiveConf.ConfVars.DYNAMICPARTITIONING) && + HiveConf.getVar(procCtx.conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") && + !HiveConf.getBoolVar(procCtx.conf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); + new SortedDynPartitionOptimizer().transform(procCtx.parseContext); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Sorted dynamic partition optimization"); + } + + if(HiveConf.getBoolVar(procCtx.conf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) + || procCtx.parseContext.hasAcidWrite()) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); + // Dynamic sort partition adds an extra RS therefore need to de-dup + new ReduceSinkDeDuplication().transform(procCtx.parseContext); + // there is an issue with dedup logic wherein SELECT is created with wrong columns + // NonBlockingOpDeDupProc fixes that + // (kind of hackish, the issue in de-dup should be fixed but it needs more investigation) + new NonBlockingOpDeDupProc().transform(procCtx.parseContext); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Reduce Sink de-duplication"); + } + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); // run the optimizations that use stats for optimization runStatsDependentOptimizations(procCtx, inputs, outputs); http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dp_counter_mm.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dp_counter_mm.q b/ql/src/test/queries/clientpositive/dp_counter_mm.q index 91c4f42..1c655fd 100644 --- a/ql/src/test/queries/clientpositive/dp_counter_mm.q +++ b/ql/src/test/queries/clientpositive/dp_counter_mm.q @@ -4,6 +4,7 @@ set hive.exec.max.dynamic.partitions.pernode=200; set hive.exec.max.dynamic.partitions=200; set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.optimize.sort.dynamic.partition.threshold=-1; drop table src2_n5; create table src2_n5 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dp_counter_non_mm.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dp_counter_non_mm.q b/ql/src/test/queries/clientpositive/dp_counter_non_mm.q index 561ae6e..d2da0b0 100644 --- a/ql/src/test/queries/clientpositive/dp_counter_non_mm.q +++ b/ql/src/test/queries/clientpositive/dp_counter_non_mm.q @@ -2,6 +2,7 @@ set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=200; set hive.exec.max.dynamic.partitions=200; +set hive.optimize.sort.dynamic.partition.threshold=-1; drop table src2_n3; create table src2_n3 (key int) partitioned by (value string); http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q b/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q index 3c2918f..8a3522d 100644 --- a/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q +++ b/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q @@ -8,8 +8,7 @@ set hive.exec.max.dynamic.partitions=1000; set hive.exec.max.dynamic.partitions.pernode=1000; set hive.exec.dynamic.partition.mode=nonstrict; set hive.vectorized.execution.enabled=true; - - +set hive.optimize.sort.dynamic.partition.threshold=1; create table over1k_n1( t tinyint, @@ -106,19 +105,19 @@ create table over1k_part2_orc( f float) partitioned by (ds string, t tinyint); -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i; explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from (select * from over1k_orc order by i limit 10) tmp where t is null or t=27; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 group by si,i,b,f,t; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; -- tests for HIVE-8162, only partition column 't' should be in last RS operator explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 group by si,i,b,f,t; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i; desc formatted over1k_part2_orc partition(ds="foo",t=27); @@ -128,7 +127,7 @@ desc formatted over1k_part2_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__ select * from over1k_part2_orc; select count(*) from over1k_part2_orc; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i; desc formatted over1k_part2_orc partition(ds="foo",t=27); @@ -150,12 +149,12 @@ create table over1k_part_buck_sort2_orc( clustered by (si) sorted by (f) into 1 buckets; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27; desc formatted over1k_part_buck_sort2_orc partition(t=27); @@ -166,7 +165,7 @@ select * from over1k_part_buck_sort2_orc; explain select count(*) from over1k_part_buck_sort2_orc; select count(*) from over1k_part_buck_sort2_orc; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27; desc formatted over1k_part_buck_sort2_orc partition(t=27); @@ -194,7 +193,7 @@ partitioned by (s string) clustered by (si) into 2 buckets stored as orc tblproperties ('transactional'='true'); -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert into table addcolumns_vectorization_true_disallowincompatible_true_fileformat_orc_tinyint partition (s) select cint,csmallint, cstring1 from alltypesorc limit 10; http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q b/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q index 436c0ed..dbeb874 100644 --- a/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q +++ b/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q @@ -7,8 +7,7 @@ set hive.exec.dynamic.partition=true; set hive.exec.max.dynamic.partitions=1000; set hive.exec.max.dynamic.partitions.pernode=1000; set hive.exec.dynamic.partition.mode=nonstrict; - - +set hive.optimize.sort.dynamic.partition.threshold=1; create table over1k_n3( t tinyint, @@ -100,19 +99,19 @@ create table over1k_part2( f float) partitioned by (ds string, t tinyint); -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k_n3 where t is null or t=27 order by i; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k_n3 where t is null or t=27 order by i; explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from (select * from over1k_n3 order by i limit 10) tmp where t is null or t=27; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k_n3 where t is null or t=27 group by si,i,b,f,t; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; -- tests for HIVE-8162, only partition column 't' should be in last RS operator explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k_n3 where t is null or t=27 group by si,i,b,f,t; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k_n3 where t is null or t=27 order by i; desc formatted over1k_part2 partition(ds="foo",t=27); @@ -122,7 +121,7 @@ desc formatted over1k_part2 partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__"); select * from over1k_part2; select count(*) from over1k_part2; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k_n3 where t is null or t=27 order by i; desc formatted over1k_part2 partition(ds="foo",t=27); @@ -144,12 +143,12 @@ create table over1k_part_buck_sort2( clustered by (si) sorted by (f) into 1 buckets; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k_n3 where t is null or t=27; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k_n3 where t is null or t=27; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k_n3 where t is null or t=27; desc formatted over1k_part_buck_sort2 partition(t=27); @@ -158,7 +157,7 @@ desc formatted over1k_part_buck_sort2 partition(t="__HIVE_DEFAULT_PARTITION__"); select * from over1k_part_buck_sort2; select count(*) from over1k_part_buck_sort2; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k_n3 where t is null or t=27; desc formatted over1k_part_buck_sort2 partition(t=27); @@ -173,7 +172,7 @@ create table over1k_part3( f float) partitioned by (s string, t tinyint, i int); -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from over1k_n3 where s="foo"; explain insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from over1k_n3 where t=27; explain insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from over1k_n3 where i=100; @@ -209,3 +208,41 @@ insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from ov insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from over1k_n3 where i=100 and t=27 and s="foo"; select sum(hash(*)) from over1k_part3; + +drop table over1k_n3; +create table over1k_n3( + t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + `dec` decimal(4,2), + bin binary) + row format delimited + fields terminated by '|'; + +load data local inpath '../../data/files/over1k' into table over1k_n3; + +analyze table over1k_n3 compute statistics for columns; +set hive.stats.fetch.column.stats=true; + +-- default hive should do cost based and add extra RS +set hive.optimize.sort.dynamic.partition.threshold=0; +explain insert overwrite table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k_n3 where t is null or t>27; + +-- default but shouldn't add extra RS +explain insert overwrite table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k_n3 where t is null or t=27 limit 10; + +-- disable +set hive.optimize.sort.dynamic.partition.threshold=-1; +explain insert overwrite table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k_n3 where t is null or t>27; + +-- enable, will add extra RS +set hive.optimize.sort.dynamic.partition.threshold=1; +explain insert overwrite table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k_n3 where t is null or t=27 limit 10; + +drop table over1k_n3; http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dynpart_sort_optimization2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynpart_sort_optimization2.q b/ql/src/test/queries/clientpositive/dynpart_sort_optimization2.q index 6dfb51a..2f3afb6 100644 --- a/ql/src/test/queries/clientpositive/dynpart_sort_optimization2.q +++ b/ql/src/test/queries/clientpositive/dynpart_sort_optimization2.q @@ -6,8 +6,7 @@ set hive.exec.dynamic.partition=true; set hive.exec.max.dynamic.partitions=1000; set hive.exec.max.dynamic.partitions.pernode=1000; set hive.exec.dynamic.partition.mode=nonstrict; - - +set hive.optimize.sort.dynamic.partition.threshold=1; -- SORT_QUERY_RESULTS @@ -78,7 +77,7 @@ select * from ss_part where ss_sold_date_sk=2452617; desc formatted ss_part partition(ss_sold_date_sk=2452638); select * from ss_part where ss_sold_date_sk=2452638; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; -- SORT DYNAMIC PARTITION DISABLED explain insert overwrite table ss_part partition (ss_sold_date_sk) @@ -210,7 +209,7 @@ create table if not exists hive13_dp1 ( PARTITIONED BY(`day` string) STORED AS ORC; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; explain insert overwrite table `hive13_dp1` partition(`day`) select key k1, @@ -228,7 +227,7 @@ from src group by "day", key; select * from hive13_dp1 order by k1, k2 limit 5; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; explain insert overwrite table `hive13_dp1` partition(`day`) select key k1, http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid.q b/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid.q index d0b32cd..7e27b88 100644 --- a/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid.q +++ b/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid.q @@ -7,7 +7,7 @@ set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.exec.dynamic.partition.mode=nonstrict; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; -- single level partition, sorted dynamic partition disabled drop table if exists acid_part; @@ -31,7 +31,7 @@ select count(*) from acid_part where ds in ('2008-04-08'); delete from acid_part where key = 'foo' and ds='2008-04-08'; select count(*) from acid_part where ds='2008-04-08'; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; -- single level partition, sorted dynamic partition enabled drop table if exists acid_part_sdpo; @@ -54,7 +54,7 @@ select count(*) from acid_part_sdpo where ds in ('2008-04-08'); delete from acid_part_sdpo where key = 'foo' and ds='2008-04-08'; select count(*) from acid_part_sdpo where ds='2008-04-08'; -set hive.optimize.sort.dynamic.partition=false; +set hive.optimize.sort.dynamic.partition.threshold=-1; -- 2 level partition, sorted dynamic partition disabled drop table if exists acid_2L_part; @@ -83,7 +83,7 @@ delete from acid_2L_part where value = 'bar'; delete from acid_2L_part where value = 'bar'; select count(*) from acid_2L_part; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; -- 2 level partition, sorted dynamic partition enabled drop table if exists acid_2L_part_sdpo; @@ -113,7 +113,7 @@ delete from acid_2L_part_sdpo where value = 'bar'; select count(*) from acid_2L_part_sdpo; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.optimize.constant.propagation=false; -- 2 level partition, sorted dynamic partition enabled, constant propagation disabled @@ -137,4 +137,4 @@ select count(*) from acid_2L_part_sdpo_no_cp where ds='2008-04-08' and hr>=11; delete from acid_2L_part_sdpo_no_cp where key = 'foo' and ds='2008-04-08' and hr=11; select count(*) from acid_2L_part_sdpo_no_cp where ds='2008-04-08' and hr=11; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q b/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q index c852d51..ca8a442 100644 --- a/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q +++ b/ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q @@ -2,7 +2,7 @@ set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.exec.dynamic.partition.mode=nonstrict; -set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.sort.dynamic.partition.threshold=1; CREATE TABLE non_acid(key string, value string) PARTITIONED BY(ds string, hr int) http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/load_data_using_job.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/load_data_using_job.q b/ql/src/test/queries/clientpositive/load_data_using_job.q index 970a752..a2d45e5 100644 --- a/ql/src/test/queries/clientpositive/load_data_using_job.q +++ b/ql/src/test/queries/clientpositive/load_data_using_job.q @@ -9,6 +9,8 @@ set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ; +-- SORT_QUERY_RESULTS + -- Single partition -- Regular load happens. CREATE TABLE srcbucket_mapjoin_n8(key int, value string) partitioned by (ds string) STORED AS TEXTFILE; http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/tez_input_counters.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_input_counters.q b/ql/src/test/queries/clientpositive/tez_input_counters.q index 4461966..1d9d7de 100644 --- a/ql/src/test/queries/clientpositive/tez_input_counters.q +++ b/ql/src/test/queries/clientpositive/tez_input_counters.q @@ -8,6 +8,7 @@ set hive.fetch.task.conversion=none; set hive.map.aggr=false; -- disabling map side aggregation as that can lead to different intermediate record counts set hive.tez.exec.print.summary=true; +set hive.optimize.sort.dynamic.partition.threshold=-1; create table testpart (k int) partitioned by (v string); insert overwrite table testpart partition(v) select * from src; http://git-wip-us.apache.org/repos/asf/hive/blob/5eebbdf7/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q index 74ca011..eaa714e 100644 --- a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q +++ b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q @@ -4,7 +4,6 @@ set hive.fetch.task.conversion=none; -- Check if vectorization code is handling partitioning on DATE and the other data types. - CREATE TABLE flights_tiny_n1 ( origin_city_name STRING, dest_city_name STRING, @@ -19,6 +18,8 @@ CREATE TABLE flights_tiny_orc STORED AS ORC AS SELECT origin_city_name, dest_city_name, fl_date, to_utc_timestamp(fl_date, 'America/Los_Angeles') as fl_time, arr_delay, fl_num FROM flights_tiny_n1; +-- SORT_QUERY_RESULTS + SELECT * FROM flights_tiny_orc; SET hive.vectorized.execution.enabled=false;
