This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 72d72d4 HIVE-21457: Perf optimizations in ORC split-generation (Prasanth Jayachandran reviewed by Gopal V) 72d72d4 is described below commit 72d72d4df734ccc653a0a6986c319200dea35f0b Author: Prasanth Jayachandran <prasan...@apache.org> AuthorDate: Thu Mar 28 13:46:03 2019 -0700 HIVE-21457: Perf optimizations in ORC split-generation (Prasanth Jayachandran reviewed by Gopal V) --- .../apache/hadoop/hive/ql/exec/FetchOperator.java | 2 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 16 ++--- .../hive/ql/io/HiveContextAwareRecordReader.java | 5 +- .../apache/hadoop/hive/ql/io/HiveInputFormat.java | 2 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 71 ++++++++++++++++------ .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 5 +- .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 2 +- .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 7 ++- .../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +- 9 files changed, 77 insertions(+), 35 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index e6b47de..3550747 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -768,7 +768,7 @@ public class FetchOperator implements Serializable { private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { boolean recursive = job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); // If this is in acid format always read it recursively regardless of what the jobconf says. - if (!recursive && !AcidUtils.isAcid(p, job)) { + if (!recursive && !AcidUtils.isAcid(fs, p, job)) { return fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER); } List<FileStatus> results = new ArrayList<FileStatus>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 50a233d..af8743d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1080,14 +1080,15 @@ public class AcidUtils { /** * Is the given directory in ACID format? + * @param fileSystem file system instance * @param directory the partition directory to check * @param conf the query configuration * @return true, if it is an ACID directory * @throws IOException */ - public static boolean isAcid(Path directory, + public static boolean isAcid(FileSystem fileSystem, Path directory, Configuration conf) throws IOException { - FileSystem fs = directory.getFileSystem(conf); + FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem; for(FileStatus file: fs.listStatus(directory)) { String filename = file.getPath().getName(); if (filename.startsWith(BASE_PREFIX) || @@ -1106,7 +1107,7 @@ public class AcidUtils { Configuration conf, ValidWriteIdList writeIdList ) throws IOException { - return getAcidState(directory, conf, writeIdList, false, false); + return getAcidState(null, directory, conf, writeIdList, false, false); } /** State class for getChildState; cannot modify 2 things in a method. */ @@ -1122,22 +1123,23 @@ public class AcidUtils { * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a * write id that we must exclude. + * @param fileSystem file system instance * @param directory the partition directory to analyze * @param conf the configuration * @param writeIdList the list of write ids that we are reading * @return the state of the directory * @throws IOException */ - public static Directory getAcidState(Path directory, + public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, ValidWriteIdList writeIdList, boolean useFileIds, boolean ignoreEmptyFiles ) throws IOException { - return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); + return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); } - public static Directory getAcidState(Path directory, + public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, @@ -1160,7 +1162,7 @@ public class AcidUtils { validTxnList.readFromString(s); } - FileSystem fs = directory.getFileSystem(conf); + FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem; // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>(); List<ParsedDelta> working = new ArrayList<ParsedDelta>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 11876fb..0287bd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -187,8 +187,8 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader long blockStart = -1; FileSplit fileSplit = split; Path path = fileSplit.getPath(); - FileSystem fs = path.getFileSystem(job); if (inputFormatClass.getName().contains("SequenceFile")) { + FileSystem fs = path.getFileSystem(job); SequenceFile.Reader in = new SequenceFile.Reader(fs, path, job); blockPointer = in.isBlockCompressed(); in.sync(fileSplit.getStart()); @@ -198,6 +198,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader blockPointer = true; blockStart = ((RCFileRecordReader) recordReader).getStart(); } else if (inputFormatClass.getName().contains("RCFile")) { + FileSystem fs = path.getFileSystem(job); blockPointer = true; RCFile.Reader in = new RCFile.Reader(fs, path, job); in.sync(fileSplit.getStart()); @@ -205,7 +206,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader in.close(); } this.jobConf = job; - this.initIOContext(blockStart, blockPointer, path.makeQualified(fs)); + this.initIOContext(blockStart, blockPointer, path); this.initIOContextSortedProps(split, recordReader, job); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6bac285..086e59f 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -604,7 +604,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } if (hasAcidDirs) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - dir, conf, validWriteIdList, Ref.from(false), true, null); + fs, dir, conf, validWriteIdList, Ref.from(false), true, null); // Find the base, created for IOW. Path base = dirInfo.getBaseDirectory(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 4f7732b..3878bba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -50,6 +50,10 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.common.BlobStorageUtils; +import org.apache.hadoop.hive.common.NoDynamicValuesException; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -194,7 +198,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException { - return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf); + return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(null, path, conf); } @@ -632,6 +636,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private SplitStrategyKind splitStrategyKind; private final SearchArgument sarg; private final AcidOperationalProperties acidOperationalProperties; + private final boolean isAcid; + private final boolean isVectorMode; Context(Configuration conf) throws IOException { this(conf, 1, null); @@ -645,6 +651,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Context(Configuration conf, final int minSplits, ExternalFooterCachesByConf efc) throws IOException { this.conf = conf; + this.isAcid = AcidUtils.isFullAcidScan(conf); + this.isVectorMode = Utilities.getIsVectorized(conf); this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); this.sarg = ConvertAstToSearchArg.createFromConf(conf); minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, DEFAULT_MIN_SPLIT_SIZE); @@ -653,7 +661,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (ss == null || ss.equals(SplitStrategyKind.HYBRID.name())) { splitStrategyKind = SplitStrategyKind.HYBRID; } else { - LOG.info("Enforcing " + ss + " ORC split strategy"); splitStrategyKind = SplitStrategyKind.valueOf(ss); } footerInSplits = HiveConf.getBoolVar(conf, @@ -661,7 +668,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, numBuckets = Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0); splitStrategyBatchMs = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS); - LOG.debug("Number of buckets specified by conf file is " + numBuckets); long cacheMemSize = HiveConf.getSizeVar( conf, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE); int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS); @@ -723,8 +729,37 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value); - LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString() - + " isTransactionalTable: " + isTxnTable + " properties: " + txnProperties); + LOG.info("Context:: " + + "isAcid: {} " + + "isVectorMode: {} " + + "sarg: {} " + + "minSplitSize: {} " + + "maxSplitSize: {} " + + "splitStrategy: {} " + + "footerInSplits: {} " + + "numBuckets: {} " + + "numThreads: {} " + + "cacheMemSize: {} " + + "cacheStripeDetails: {} " + + "useSoftReference: {} " + + "writeIdList: {} " + + "isTransactionalTable: {} " + + "txnProperties: {} ", + isAcid, + isVectorMode, + sarg, + minSize, + maxSize, + splitStrategyKind, + footerInSplits, + numBuckets, + numThreads, + cacheMemSize, + cacheStripeDetails, + useSoftReference, + writeIdList, + isTxnTable, + txnProperties); } @VisibleForTesting @@ -1044,6 +1079,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final boolean allowSyntheticFileIds; private final boolean isDefaultFs; private final Configuration conf; + private final boolean isAcid; + private final boolean vectorMode; /** * @param dir - root of partition dir @@ -1060,13 +1097,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.allowSyntheticFileIds = allowSyntheticFileIds; this.isDefaultFs = isDefaultFs; this.conf = context.conf; + this.isAcid = context.isAcid; + this.vectorMode = context.isVectorMode; } @Override public List<OrcSplit> getSplits() throws IOException { List<OrcSplit> splits = Lists.newArrayList(); - boolean isAcid = AcidUtils.isFullAcidScan(conf); - boolean vectorMode = Utilities.getIsVectorized(conf); OrcSplit.OffsetAndBucketProperty offsetAndBucket = null; for (HdfsFileStatusWithId file : fileStatuses) { if (isOriginal && isAcid && vectorMode) { @@ -1083,7 +1120,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) { final long splitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_ORC_BLOB_STORAGE_SPLIT_SIZE); - LOG.info("Blob storage detected for BI split strategy. Splitting files at boundary {}..", splitSize); + if (LOG.isDebugEnabled()) { + LOG.debug("Blob storage detected for BI split strategy. Splitting files at boundary {}..", splitSize); + } long start; for (start = 0; start < logicalLen; start = start + splitSize) { OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, start, @@ -1234,7 +1273,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - dir, context.conf, context.writeIdList, useFileIds, true, null); + fs, dir, context.conf, context.writeIdList, useFileIds, true, null); // find the base files (original or new style) List<AcidBaseFileInfo> baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { @@ -1549,8 +1588,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } private List<OrcSplit> callInternal() throws IOException { - boolean isAcid = AcidUtils.isFullAcidScan(context.conf); - boolean vectorMode = Utilities.getIsVectorized(context.conf); + boolean isAcid = context.isAcid; + boolean vectorMode = context.isVectorMode; if (isOriginal && isAcid && vectorMode) { offsetAndBucket = VectorizedOrcAcidRowBatchReader.computeOffsetAndBucket(file, rootDir, isOriginal, @@ -1964,9 +2003,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits started"); - } + long start = System.currentTimeMillis(); + LOG.info("getSplits started"); Configuration conf = job; if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) { // Create HiveConf once, since this is expensive. @@ -1974,9 +2012,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } List<OrcSplit> result = generateSplitsInfo(conf, new Context(conf, numSplits, createExternalCaches())); - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits finished"); - } + long end = System.currentTimeMillis(); + LOG.info("getSplits finished (#splits: {}). duration: {} ms", result.size(), (end - start)); return result.toArray(new InputSplit[result.size()]); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index fc7e7e1..fbbddba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -463,7 +463,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ */ //the split is from something other than the 1st file of the logical bucket - compute offset AcidUtils.Directory directoryState - = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); + = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, validWriteIdList, false, + true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { @@ -577,7 +578,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ assert options.getOffset() == 0; assert options.getMaxOffset() == Long.MAX_VALUE; AcidUtils.Directory directoryState - = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); + = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, validWriteIdList, false, true); /** * Note that for reading base_x/ or delta_x_x/ with non-acid schema, * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index dc18ba1..18c35f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -722,7 +722,7 @@ public class VectorizedOrcAcidRowBatchReader int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); - AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf, + AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 1019285..f323c31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -266,7 +266,7 @@ public class CompactorMR { // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, false, true); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, false, true); List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories(); int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); if(parsedDeltas.size() > maxDeltastoHandle) { @@ -345,7 +345,8 @@ public class CompactorMR { private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false, + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, + Ref.from(false), false, t.getParameters()); int deltaCount = dir.getCurrentDirectories().size(); int origCount = dir.getOriginalFiles().size(); @@ -415,7 +416,7 @@ public class CompactorMR { StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " + t.getDbName() + "." + t.getTableName()); - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, t.getParameters()); removeFilesForMmTable(conf, dir); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index deabec6..6168fc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -291,7 +291,7 @@ public class Initiator extends MetaStoreCompactorThread { boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, writeIds, false, false); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, false, false); Path base = dir.getBaseDirectory(); long baseSize = 0; FileStatus stat = null;