Repository: hive Updated Branches: refs/heads/branch-3 ce99abac5 -> ba621257c
HIVE-19312 : MM tables don't work with BucketizedHIF (Sergey Shelukhin, reviewed by Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a30a6cac Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a30a6cac Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a30a6cac Branch: refs/heads/branch-3 Commit: a30a6cac918ce90a1efc411c1d2fb6bd028f3725 Parents: ce99aba Author: sergey <[email protected]> Authored: Mon May 7 15:15:23 2018 -0700 Committer: sergey <[email protected]> Committed: Tue Jun 5 12:16:58 2018 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 1 + .../hive/ql/io/BucketizedHiveInputFormat.java | 56 +++++-- .../hadoop/hive/ql/io/HiveInputFormat.java | 29 ++-- .../rcfile/truncate/ColumnTruncateMapper.java | 4 +- ql/src/test/queries/clientpositive/mm_bhif.q | 27 ++++ .../test/results/clientpositive/mm_bhif.q.out | 146 +++++++++++++++++++ 6 files changed, 234 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a30a6cac/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 8af7bf4..7aeba5d 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -586,6 +586,7 @@ minillaplocal.query.files=\ mapjoin_hint.q,\ mapjoin_emit_interval.q,\ mergejoin_3way.q,\ + mm_bhif.q,\ mm_conversions.q,\ mm_exim.q,\ mm_loaddata.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/a30a6cac/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index e09c6ec..58f0480 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.hadoop.hive.common.ValidWriteIdList; import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -50,8 +52,7 @@ import org.apache.hadoop.mapred.Reporter; public class BucketizedHiveInputFormat<K extends WritableComparable, V extends Writable> extends HiveInputFormat<K, V> { - public static final Logger LOG = LoggerFactory - .getLogger("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat"); + public static final Logger LOG = LoggerFactory.getLogger(BucketizedHiveInputFormat.class); @Override public RecordReader getRecordReader(InputSplit split, JobConf job, @@ -123,25 +124,34 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W // for each dir, get all files under the dir, do getSplits to each // individual file, // and then create a BucketizedHiveInputSplit on it + + ArrayList<Path> currentDir = null; for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); - // create a new InputFormat instance if this is the first time to see this - // class + // create a new InputFormat instance if this is the first time to see this class Class inputFormatClass = part.getInputFileFormatClass(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); newjob.setInputFormat(inputFormat.getClass()); - FileStatus[] listStatus = listStatus(newjob, dir); - - for (FileStatus status : listStatus) { - LOG.info("block size: " + status.getBlockSize()); - LOG.info("file length: " + status.getLen()); - FileInputFormat.setInputPaths(newjob, status.getPath()); - InputSplit[] iss = inputFormat.getSplits(newjob, 0); - if (iss != null && iss.length > 0) { - numOrigSplits += iss.length; - result.add(new BucketizedHiveInputSplit(iss, inputFormatClass - .getName())); + ValidWriteIdList mmIds = null; + if (part.getTableDesc() != null) { + // This can happen for truncate table case for non-MM tables. + mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null); + throw new AssertionError(dir + ": " + part); + } + // TODO: should this also handle ACID operation, etc.? seems to miss a lot of stuff from HIF. + Path[] finalDirs = (mmIds == null) ? new Path[] { dir } + : processPathsForMmRead(Lists.newArrayList(dir), newjob, mmIds); + if (finalDirs == null) { + continue; // No valid inputs - possible in MM case. + } + + for (Path finalDir : finalDirs) { + FileStatus[] listStatus = listStatus(newjob, finalDir); + + for (FileStatus status : listStatus) { + numOrigSplits = addBHISplit( + status, inputFormat, inputFormatClass, numOrigSplits, newjob, result); } } } @@ -149,4 +159,18 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W + numOrigSplits + " original splits."); return result.toArray(new BucketizedHiveInputSplit[result.size()]); } + + private int addBHISplit(FileStatus status, InputFormat inputFormat, Class inputFormatClass, + int numOrigSplits, JobConf newjob, ArrayList<InputSplit> result) throws IOException { + LOG.info("block size: " + status.getBlockSize()); + LOG.info("file length: " + status.getLen()); + FileInputFormat.setInputPaths(newjob, status.getPath()); + InputSplit[] iss = inputFormat.getSplits(newjob, 0); + if (iss != null && iss.length > 0) { + numOrigSplits += iss.length; + result.add(new BucketizedHiveInputSplit(iss, inputFormatClass + .getName())); + } + return numOrigSplits; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a30a6cac/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6c6eeff..3d965c0 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -461,18 +461,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits, TableDesc table, List<InputSplit> result) throws IOException { - ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName()); - ValidWriteIdList validMmWriteIdList; - if (AcidUtils.isInsertOnlyTable(table.getProperties())) { - if (validWriteIdList == null) { - throw new IOException("Insert-Only table: " + table.getTableName() - + " is missing from the ValidWriteIdList config: " - + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); - } - validMmWriteIdList = validWriteIdList; - } else { - validMmWriteIdList = null; // for non-MM case - } + ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList( + conf, table.getTableName()); + ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList); try { Utilities.copyTablePropertiesToConf(table, conf); @@ -538,6 +529,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } + protected ValidWriteIdList getMmValidWriteIds( + JobConf conf, TableDesc table, ValidWriteIdList validWriteIdList) throws IOException { + if (!AcidUtils.isInsertOnlyTable(table.getProperties())) return null; + if (validWriteIdList == null) { + validWriteIdList = AcidUtils.getTableValidWriteIdList( conf, table.getTableName()); + if (validWriteIdList == null) { + throw new IOException("Insert-Only table: " + table.getTableName() + + " is missing from the ValidWriteIdList config: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + } + } + return validWriteIdList; + } + public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf, ValidWriteIdList validWriteIdList) throws IOException { if (validWriteIdList == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/a30a6cac/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index 591c4b8..c112978 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -236,7 +236,9 @@ public class ColumnTruncateMapper extends MapReduceBase implements Path backupPath = backupOutputPath(fs, outputPath, job); Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); - fs.delete(backupPath, true); + if (backupPath != null) { + fs.delete(backupPath, true); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/a30a6cac/ql/src/test/queries/clientpositive/mm_bhif.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_bhif.q b/ql/src/test/queries/clientpositive/mm_bhif.q new file mode 100644 index 0000000..f9c7f8a --- /dev/null +++ b/ql/src/test/queries/clientpositive/mm_bhif.q @@ -0,0 +1,27 @@ +SET hive.vectorized.execution.enabled=true; +SET hive.vectorized.execution.reduce.enabled=true; +set hive.mapred.mode=nonstrict; +set hive.exec.reducers.max = 10; +set hive.map.groupby.sorted=true; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +-- SORT_QUERY_RESULTS + +CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1'); + +INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'; + + +set hive.fetch.task.conversion=none; + +select * from T1_mm; + +explain +select count(distinct key) from T1_mm; +select count(distinct key) from T1_mm; + +DROP TABLE T1_mm; http://git-wip-us.apache.org/repos/asf/hive/blob/a30a6cac/ql/src/test/results/clientpositive/mm_bhif.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/mm_bhif.q.out b/ql/src/test/results/clientpositive/mm_bhif.q.out new file mode 100644 index 0000000..4774007 --- /dev/null +++ b/ql/src/test/results/clientpositive/mm_bhif.q.out @@ -0,0 +1,146 @@ +PREHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@T1_mm +POSTHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@T1_mm +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1_mm +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1_mm +POSTHOOK: Output: default@t1_mm@ds=1 +PREHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +PREHOOK: Output: default@t1_mm@ds=1 +POSTHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +POSTHOOK: Output: default@t1_mm@ds=1 +POSTHOOK: Lineage: t1_mm PARTITION(ds=1).key SIMPLE [(t1_mm)t1_mm.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1_mm PARTITION(ds=1).val SIMPLE [(t1_mm)t1_mm.FieldSchema(name:val, type:string, comment:null), ] +PREHOOK: query: select * from T1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select * from T1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +1 11 1 +2 12 1 +3 13 1 +7 17 1 +8 18 1 +8 28 1 +PREHOOK: query: explain +select count(distinct key) from T1_mm +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(distinct key) from T1_mm +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1_mm + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: partial2 + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col0) + mode: partial2 + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(distinct key) from T1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct key) from T1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +5 +PREHOOK: query: DROP TABLE T1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1_mm +PREHOOK: Output: default@t1_mm +POSTHOOK: query: DROP TABLE T1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1_mm +POSTHOOK: Output: default@t1_mm
