Repository: hive Updated Branches: refs/heads/master 77f30d4f9 -> bc1c434f1
HIVE-11553 : use basic file metadata cache in ETLSplitStrategy-related paths (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bc1c434f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bc1c434f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bc1c434f Branch: refs/heads/master Commit: bc1c434f1ffc70dc2d52f34839c6f54b0b0c8d10 Parents: 77f30d4 Author: Sergey Shelukhin <[email protected]> Authored: Fri Oct 2 11:11:36 2015 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Oct 2 13:11:28 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 9 +- .../hadoop/hive/metastore/HiveMetaStore.java | 20 +- .../hive/metastore/HiveMetaStoreClient.java | 94 +++++ .../hadoop/hive/metastore/IMetaStoreClient.java | 20 ++ .../hive/metastore/hbase/HBaseReadWrite.java | 9 +- ql/pom.xml | 3 + .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +- .../hive/ql/exec/tez/HiveSplitGenerator.java | 4 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 360 +++++++++++++++---- .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 2 +- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 3 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 44 ++- 12 files changed, 484 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e7ed07e..77ca613 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -153,7 +153,7 @@ public class HiveConf extends Configuration { HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS, HiveConf.ConfVars.METASTORE_PART_INHERIT_TBL_PROPS, - HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX, + HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_OBJECTS_MAX, HiveConf.ConfVars.METASTORE_INIT_HOOKS, HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS, HiveConf.ConfVars.HMSHANDLERATTEMPTS, @@ -567,9 +567,9 @@ public class HiveConf extends Configuration { "Maximum number of objects (tables/partitions) can be retrieved from metastore in one batch. \n" + "The higher the number, the less the number of round trips is needed to the Hive metastore server, \n" + "but it may also cause higher memory requirement at the client side."), - METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX( + METASTORE_BATCH_RETRIEVE_OBJECTS_MAX( "hive.metastore.batch.retrieve.table.partition.max", 1000, - "Maximum number of table partitions that metastore internally retrieves in one batch."), + "Maximum number of objects that metastore internally retrieves in one batch."), METASTORE_INIT_HOOKS("hive.metastore.init.hooks", "", "A comma separated list of hooks to be invoked at the beginning of HMSHandler initialization. \n" + @@ -1077,6 +1077,9 @@ public class HiveConf extends Configuration { " (split generation reads and caches file footers). HYBRID chooses between the above strategies" + " based on heuristics."), + HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false, + "Whether to enable using file metadata cache in metastore for ORC file footers."), + HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false, "If turned on splits generated by orc will include metadata about the stripes in the file. This\n" + "data is read remotely (from the client or HS2 machine) and sent to all the tasks."), http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 815f499..8cd1f52 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -208,6 +208,7 @@ import org.apache.thrift.transport.TTransportFactory; import javax.jdo.JDOException; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -5718,12 +5719,29 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (metadatas != null) { assert metadatas.length == fileIds.size(); for (int i = 0; i < metadatas.length; ++i) { - result.putToMetadata(fileIds.get(i), metadatas[i]); + ByteBuffer bb = metadatas[i]; + if (bb == null) continue; + bb = handleReadOnlyBufferForThrift(bb); + result.putToMetadata(fileIds.get(i), bb); } } + if (!result.isSetMetadata()) { + result.setMetadata(new HashMap<Long, ByteBuffer>()); // Set the required field. + } return result; } + private ByteBuffer handleReadOnlyBufferForThrift(ByteBuffer bb) { + if (!bb.isReadOnly()) return bb; + // Thrift cannot write read-only buffers... oh well. + // TODO: actually thrift never writes to the buffer, so we could use reflection to + // unset the unnecessary read-only flag if allocation/copy perf becomes a problem. + ByteBuffer copy = ByteBuffer.allocate(bb.capacity()); + copy.put(bb); + copy.flip(); + return copy; + } + @Override public PutFileMetadataResult put_file_metadata(PutFileMetadataRequest req) throws TException { getMS().putFileMetadata(req.getFileIds(), req.getMetadata()); http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 6f15fd0..8e32966 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -35,9 +35,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -61,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; @@ -76,6 +80,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; @@ -116,6 +122,7 @@ import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.PrivilegeBag; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest; import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; @@ -170,6 +177,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient { private String tokenStrForm; private final boolean localMetaStore; private final MetaStoreFilterHook filterHook; + private final int fileMetadataBatchSize; private Map<String, String> currentMetaVars; @@ -195,6 +203,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient { } this.conf = conf; filterHook = loadFilterHooks(); + fileMetadataBatchSize = HiveConf.getIntVar( + conf, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_OBJECTS_MAX); String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS); localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri); @@ -2108,4 +2118,88 @@ public class HiveMetaStoreClient implements IMetaStoreClient { PartitionsStatsRequest req = new PartitionsStatsRequest(dbName, tblName, colNames, partNames); return client.get_aggr_stats_for(req); } + + @Override + public Iterable<Entry<Long, ByteBuffer>> getFileMetadata( + final List<Long> fileIds) throws TException { + return new MetastoreMapIterable<Long, ByteBuffer>() { + private int listIndex = 0; + @Override + protected Map<Long, ByteBuffer> fetchNextBatch() throws TException { + if (listIndex == fileIds.size()) return null; + int endIndex = Math.min(listIndex + fileMetadataBatchSize, fileIds.size()); + List<Long> subList = fileIds.subList(listIndex, endIndex); + GetFileMetadataRequest req = new GetFileMetadataRequest(); + req.setFileIds(subList); + GetFileMetadataResult resp = client.get_file_metadata(req); + listIndex = endIndex; + return resp.getMetadata(); + } + }; + } + + public static abstract class MetastoreMapIterable<K, V> + implements Iterable<Entry<K, V>>, Iterator<Entry<K, V>> { + private Iterator<Entry<K, V>> currentIter; + + protected abstract Map<K, V> fetchNextBatch() throws TException; + + @Override + public Iterator<Entry<K, V>> iterator() { + return this; + } + + @Override + public boolean hasNext() { + ensureCurrentBatch(); + return currentIter != null; + } + + private void ensureCurrentBatch() { + if (currentIter != null && currentIter.hasNext()) return; + currentIter = null; + Map<K, V> currentBatch; + do { + try { + currentBatch = fetchNextBatch(); + } catch (TException ex) { + throw new RuntimeException(ex); + } + if (currentBatch == null) return; // No more data. + } while (currentBatch.isEmpty()); + currentIter = currentBatch.entrySet().iterator(); + } + + @Override + public Entry<K, V> next() { + ensureCurrentBatch(); + if (currentIter == null) throw new NoSuchElementException(); + return currentIter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public void clearFileMetadata(List<Long> fileIds) throws TException { + ClearFileMetadataRequest req = new ClearFileMetadataRequest(); + req.setFileIds(fileIds); + client.clear_file_metadata(req); + } + + @Override + public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata) throws TException { + PutFileMetadataRequest req = new PutFileMetadataRequest(); + req.setFileIds(fileIds); + req.setMetadata(metadata); + client.put_file_metadata(req); + } + + @Override + public boolean isSameConfObj(HiveConf c) { + return conf == c; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index e4a6cdb..77820ae 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -79,8 +79,10 @@ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Map.Entry; /** * Wrapper around hive metastore thrift api @@ -1459,4 +1461,22 @@ public interface IMetaStoreClient { * flush statistics objects. This should be called at the beginning of each query. */ void flushCache(); + + /** + * Gets file metadata, as cached by metastore, for respective file IDs. + * The metadata that is not cached in metastore may be missing. + */ + Iterable<Entry<Long, ByteBuffer>> getFileMetadata(List<Long> fileIds) throws TException; + + /** + * Cleares the file metadata cache for respective file IDs. + */ + void clearFileMetadata(List<Long> fileIds) throws TException; + + /** + * Adds file metadata for respective file IDs to metadata cache in metastore. + */ + void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata) throws TException; + + boolean isSameConfObj(HiveConf c); } http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index d38c561..f69b4c7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -1959,7 +1960,13 @@ public class HBaseReadWrite { Result[] results = htab.get(gets); for (int i = 0; i < results.length; ++i) { Result r = results[i]; - resultDest[i] = (r.isEmpty() ? null : r.getValueAsByteBuffer(colFam, colName)); + if (r.isEmpty()) { + resultDest[i] = null; + } else { + Cell cell = r.getColumnLatestCell(colFam, colName); + resultDest[i] = ByteBuffer.wrap( + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index 36b3433..587e2ee 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -704,9 +704,12 @@ <include>org.apache.hive:hive-common</include> <include>org.apache.hive:hive-exec</include> <include>org.apache.hive:hive-serde</include> + <include>org.apache.hive:hive-metastore</include> <include>com.esotericsoftware.kryo:kryo</include> <include>org.apache.parquet:parquet-hadoop-bundle</include> <include>org.apache.thrift:libthrift</include> + <include>org.apache.thrift:libfb303</include> + <include>javax.jdo:jdo-api</include> <include>commons-lang:commons-lang</include> <include>org.apache.commons:commons-lang3</include> <include>org.jodd:jodd-core</include> http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 3511e73..74007e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3635,7 +3635,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } int partitionBatchSize = HiveConf.getIntVar(conf, - ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX); + ConfVars.METASTORE_BATCH_RETRIEVE_OBJECTS_MAX); // drop the table db.dropTable(dropTbl.getTableName(), dropTbl.getIfPurge()); http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 87881b6..0a43c15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -91,10 +91,10 @@ public class HiveSplitGenerator extends InputInitializer { userPayloadProto = MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload()); - this.conf = - TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes()); + this.conf = TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes()); this.jobConf = new JobConf(conf); + // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 57bde3e..ef62a23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -19,11 +19,14 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -34,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.codec.binary.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,10 +61,13 @@ import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion; +import org.apache.hadoop.hive.ql.io.orc.ReaderImpl.FileMetaInfo; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -110,7 +117,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, InputFormatChecker, VectorizedInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination { - static enum SplitStrategyKind{ + static enum SplitStrategyKind { HYBRID, BI, ETL @@ -441,7 +448,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, */ static class Context { private final Configuration conf; - private static Cache<Path, FileInfo> footerCache; + + // We store all caches in variables to change the main one based on config. + // This is not thread safe between different split generations (and wasn't anyway). + private static FooterCache footerCache; + private static LocalCache localCache; + private static MetastoreCache metaCache; private static ExecutorService threadPool = null; private final int numBuckets; private final long maxSize; @@ -490,13 +502,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, .setNameFormat("ORC_GET_SPLITS #%d").build()); } - if (footerCache == null && cacheStripeDetails) { - footerCache = CacheBuilder.newBuilder() - .concurrencyLevel(numThreads) - .initialCapacity(cacheStripeDetailsSize) - .maximumSize(cacheStripeDetailsSize) - .softValues() - .build(); + // TODO: local cache is created once, so the configs for future queries will not be honored. + if (cacheStripeDetails) { + // Note that there's no FS check here; we implicitly only use metastore cache for + // HDFS, because only HDFS would return fileIds for us. If fileId is extended using + // size/mod time/etc. for other FSes, we might need to check FSes explicitly because + // using such an aggregate fileId cache is not bulletproof and should be disable-able. + boolean useMetastoreCache = HiveConf.getBoolVar( + conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED); + if (localCache == null) { + localCache = new LocalCache(numThreads, cacheStripeDetailsSize); + } + if (useMetastoreCache) { + if (metaCache == null) { + metaCache = new MetastoreCache(localCache); + } + assert conf instanceof HiveConf; + metaCache.configure((HiveConf)conf); + } + // Set footer cache for current split generation. See field comment - not thread safe. + footerCache = useMetastoreCache ? metaCache : localCache; } } String value = conf.get(ValidTxnList.VALID_TXNS_KEY, @@ -591,49 +616,37 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.covered = covered; } - private FileInfo verifyCachedFileInfo(FileStatus file) { - FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath()); - if (fileInfo != null) { - if (isDebugEnabled) { - LOG.debug("Info cached for path: " + file.getPath()); - } - if (fileInfo.modificationTime == file.getModificationTime() && - fileInfo.size == file.getLen()) { - // Cached copy is valid - context.cacheHitCounter.incrementAndGet(); - return fileInfo; - } else { - // Invalidate - Context.footerCache.invalidate(file.getPath()); - if (isDebugEnabled) { - LOG.debug("Meta-Info for : " + file.getPath() + - " changed. CachedModificationTime: " - + fileInfo.modificationTime + ", CurrentModificationTime: " - + file.getModificationTime() - + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + - file.getLen()); - } - } - } else { - if (isDebugEnabled) { - LOG.debug("Info not cached for path: " + file.getPath()); - } - } - return null; - } - @Override public List<SplitInfo> getSplits() throws IOException { - List<SplitInfo> result = Lists.newArrayList(); - for (HdfsFileStatusWithId file : files) { - FileInfo info = null; - if (context.cacheStripeDetails) { - info = verifyCachedFileInfo(file.getFileStatus()); + List<SplitInfo> result = new ArrayList<>(files.size()); + // TODO: Right now, we do the metastore call here, so there will be a metastore call per + // partition. If we had a sync point after getting file lists, we could make just one + // call; this might be too much sync for many partitions and also cause issues with the + // huge metastore call result that cannot be handled with in-API batching. To have an + // optimal number of metastore calls, we should wait for batch-size number of files (a + // few hundreds) to become available, then call metastore. + if (context.cacheStripeDetails) { + FileInfo[] infos = Context.footerCache.getAndValidate(files); + for (int i = 0; i < files.size(); ++i) { + FileInfo info = infos[i]; + if (info != null) { + // Cached copy is valid + context.cacheHitCounter.incrementAndGet(); + } + HdfsFileStatusWithId file = files.get(i); + // ignore files of 0 length + if (file.getFileStatus().getLen() > 0) { + result.add(new SplitInfo( + context, fs, file, info, isOriginal, deltas, true, dir, covered)); + } } - // ignore files of 0 length - if (file.getFileStatus().getLen() > 0) { - result.add(new SplitInfo( - context, fs, file, info, isOriginal, deltas, true, dir, covered)); + } else { + for (HdfsFileStatusWithId file : files) { + // ignore files of 0 length + if (file.getFileStatus().getLen() > 0) { + result.add(new SplitInfo( + context, fs, file, null, isOriginal, deltas, true, dir, covered)); + } } } return result; @@ -808,7 +821,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.file = this.fileWithId.getFileStatus(); this.blockSize = this.file.getBlockSize(); this.fileInfo = splitInfo.fileInfo; - locations = SHIMS.getLocationsWithOffset(fs, file); + locations = SHIMS.getLocationsWithOffset(fs, file); // TODO: potential DFS call this.isOriginal = splitInfo.isOriginal; this.deltas = splitInfo.deltas; this.hasBase = splitInfo.hasBase; @@ -990,41 +1003,51 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } private void populateAndCacheStripeDetails() throws IOException { - Reader orcReader = OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs)); + // Only create OrcReader if we are missing some information. + OrcProto.Footer footer; if (fileInfo != null) { stripes = fileInfo.stripeInfos; fileMetaInfo = fileInfo.fileMetaInfo; metadata = fileInfo.metadata; - types = fileInfo.types; + types = fileInfo.footer.getTypesList(); writerVersion = fileInfo.writerVersion; + footer = fileInfo.footer; // For multiple runs, in case sendSplitsInFooter changes if (fileMetaInfo == null && context.footerInSplits) { + Reader orcReader = createOrcReader(); fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo(); - fileInfo.metadata = orcReader.getMetadata(); - fileInfo.types = orcReader.getTypes(); - fileInfo.writerVersion = orcReader.getWriterVersion(); + assert fileInfo.metadata != null && fileInfo.footer != null + && fileInfo.writerVersion != null; + footer = fileInfo.footer; + // We assume that if we needed to create a reader, we need to cache it to meta cache. + // TODO: This will also needlessly overwrite it in local cache for now. + Context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader); } } else { + Reader orcReader = createOrcReader(); stripes = orcReader.getStripes(); metadata = orcReader.getMetadata(); types = orcReader.getTypes(); writerVersion = orcReader.getWriterVersion(); fileMetaInfo = context.footerInSplits ? ((ReaderImpl) orcReader).getFileMetaInfo() : null; + footer = orcReader.getFooter(); if (context.cacheStripeDetails) { - // Populate into cache. - Context.footerCache.put(file.getPath(), - new FileInfo(file.getModificationTime(), file.getLen(), stripes, - metadata, types, fileMetaInfo, writerVersion)); + Long fileId = fileWithId.getFileId(); + Context.footerCache.put(fileId, file, fileMetaInfo, orcReader); } } includedCols = genIncludedColumns(types, context.conf, isOriginal); - projColsUncompressedSize = computeProjectionSize(orcReader, includedCols, isOriginal); + projColsUncompressedSize = computeProjectionSize(footer, includedCols, isOriginal); + } + + private Reader createOrcReader() throws IOException { + return OrcFile.createReader(file.getPath(), + OrcFile.readerOptions(context.conf).filesystem(fs)); } - private long computeProjectionSize(final Reader orcReader, final boolean[] includedCols, - final boolean isOriginal) { + private long computeProjectionSize( + OrcProto.Footer footer, final boolean[] includedCols, final boolean isOriginal) { final int rootIdx = getRootColumn(isOriginal); List<Integer> internalColIds = Lists.newArrayList(); if (includedCols != null) { @@ -1034,7 +1057,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } } - return orcReader.getRawDataSizeFromColIndices(internalColIds); + return ReaderImpl.getRawDataSizeFromColIndices(internalColIds, footer); } } @@ -1045,7 +1068,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, static List<OrcSplit> generateSplitsInfo(Configuration conf, int numSplits) throws IOException { - // use threads to resolve directories into splits + // Use threads to resolve directories into splits. + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) { + // Create HiveConf once, since this is expensive. + conf = new HiveConf(conf, OrcInputFormat.class); + } Context context = new Context(conf, numSplits); List<OrcSplit> splits = Lists.newArrayList(); List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList(); @@ -1072,6 +1099,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, LOG.debug(splitStrategy); } + // Hack note - different split strategies return differently typed lists, yay Java. + // This works purely by magic, because we know which strategy produces which type. if (splitStrategy instanceof ETLSplitStrategy) { List<SplitInfo> splitInfos = ((ETLSplitStrategy)splitStrategy).getSplits(); for (SplitInfo splitInfo : splitInfos) { @@ -1134,26 +1163,28 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, * */ private static class FileInfo { - long modificationTime; - long size; - List<StripeInformation> stripeInfos; - ReaderImpl.FileMetaInfo fileMetaInfo; - Metadata metadata; - List<OrcProto.Type> types; + private final long modificationTime; + private final long size; + private final Long fileId; + private final List<StripeInformation> stripeInfos; + private ReaderImpl.FileMetaInfo fileMetaInfo; + private Metadata metadata; + private OrcProto.Footer footer; private OrcFile.WriterVersion writerVersion; FileInfo(long modificationTime, long size, List<StripeInformation> stripeInfos, - Metadata metadata, List<OrcProto.Type> types, + Metadata metadata, OrcProto.Footer footer, ReaderImpl.FileMetaInfo fileMetaInfo, - OrcFile.WriterVersion writerVersion) { + OrcFile.WriterVersion writerVersion, Long fileId) { this.modificationTime = modificationTime; this.size = size; + this.fileId = fileId; this.stripeInfos = stripeInfos; this.fileMetaInfo = fileMetaInfo; this.metadata = metadata; - this.types = types; + this.footer = footer; this.writerVersion = writerVersion; } } @@ -1513,5 +1544,186 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, bucket, validTxnList, new Reader.Options(), deltaDirectory); } + /** + * Represents footer cache. + */ + public interface FooterCache { + FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException; + void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader) + throws IOException; + } + + /** Local footer cache using Guava. Stores convoluted Java objects. */ + private static class LocalCache implements FooterCache { + private Cache<Path, FileInfo> cache; + + public LocalCache(int numThreads, int cacheStripeDetailsSize) { + cache = CacheBuilder.newBuilder() + .concurrencyLevel(numThreads) + .initialCapacity(cacheStripeDetailsSize) + .maximumSize(cacheStripeDetailsSize) + .softValues() + .build(); + } + + @Override + public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) { + // TODO: should local cache also be by fileId? Preserve the original logic for now. + FileInfo[] result = new FileInfo[files.size()]; + int i = -1; + for (HdfsFileStatusWithId fileWithId : files) { + ++i; + FileStatus file = fileWithId.getFileStatus(); + Path path = file.getPath(); + Long fileId = fileWithId.getFileId(); + FileInfo fileInfo = cache.getIfPresent(path); + if (isDebugEnabled) { + LOG.debug("Info " + (fileInfo == null ? "not " : "") + "cached for path: " + path); + } + if (fileInfo == null) continue; + if ((fileId != null && fileInfo.fileId != null && fileId == fileInfo.fileId) + || (fileInfo.modificationTime == file.getModificationTime() && + fileInfo.size == file.getLen())) { + result[i] = fileInfo; + continue; + } + // Invalidate + cache.invalidate(path); + if (isDebugEnabled) { + LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: " + + fileInfo.modificationTime + ", CurrentModificationTime: " + + file.getModificationTime() + ", CachedLength: " + fileInfo.size + + ", CurrentLength: " + file.getLen()); + } + } + return result; + } + + public void put(Path path, FileInfo fileInfo) { + cache.put(path, fileInfo); + } + + @Override + public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader) + throws IOException { + cache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(), + orcReader.getStripes(), orcReader.getMetadata(), orcReader.getFooter(), fileMetaInfo, + orcReader.getWriterVersion(), fileId)); + } + } + + /** Metastore-based footer cache storing serialized footers. Also has a local cache. */ + public static class MetastoreCache implements FooterCache { + private final LocalCache localCache; + private boolean isWarnLogged = false; + private HiveConf conf; + + public MetastoreCache(LocalCache lc) { + localCache = lc; + } + @Override + public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException { + // First, check the local cache. + FileInfo[] result = localCache.getAndValidate(files); + assert result.length == files.size(); + // This is an unfortunate consequence of batching/iterating thru MS results. + // TODO: maybe have a direct map call for small lists if this becomes a perf issue. + HashMap<Long, Integer> posMap = new HashMap<>(files.size()); + for (int i = 0; i < result.length; ++i) { + if (result[i] != null) continue; + HdfsFileStatusWithId file = files.get(i); + Long fileId = file.getFileId(); + if (fileId == null) { + if (!isWarnLogged || isDebugEnabled) { + LOG.warn("Not using metastore cache because fileId is missing: " + + file.getFileStatus().getPath()); + isWarnLogged = true; + } + continue; + } + posMap.put(fileId, i); + } + Iterator<Entry<Long, ByteBuffer>> iter = null; + Hive hive; + try { + hive = getHive(); + iter = hive.getFileMetadata(Lists.newArrayList(posMap.keySet()), conf).iterator(); + } catch (HiveException ex) { + throw new IOException(ex); + } + List<Long> corruptIds = null; + while (iter.hasNext()) { + Entry<Long, ByteBuffer> e = iter.next(); + int ix = posMap.get(e.getKey()); + assert result[ix] == null; + HdfsFileStatusWithId file = files.get(ix); + assert file.getFileId() == e.getKey(); + result[ix] = createFileInfoFromMs(file, e.getValue()); + if (result[ix] == null) { + if (corruptIds == null) { + corruptIds = new ArrayList<>(); + } + corruptIds.add(file.getFileId()); + } else { + localCache.put(file.getFileStatus().getPath(), result[ix]); + } + } + if (corruptIds != null) { + try { + hive.clearFileMetadata(corruptIds); + } catch (HiveException ex) { + LOG.error("Failed to clear corrupt cache data", ex); + } + } + return result; + } + + private Hive getHive() throws HiveException { + // TODO: we wish we could cache the Hive object, but it's not thread safe, and each + // threadlocal we "cache" would need to be reinitialized for every query. This is + // a huge PITA. Hive object will be cached internally, but the compat check will be + // done every time inside get(). + return Hive.getWithFastCheck(conf); + } + + private static FileInfo createFileInfoFromMs( + HdfsFileStatusWithId file, ByteBuffer bb) throws IOException { + FileStatus fs = file.getFileStatus(); + ReaderImpl.FooterInfo fi = null; + ByteBuffer original = bb.duplicate(); + try { + fi = ReaderImpl.extractMetaInfoFromFooter(bb, fs.getPath()); + } catch (Exception ex) { + byte[] data = new byte[original.remaining()]; + System.arraycopy(original.array(), original.arrayOffset() + original.position(), + data, 0, data.length); + String msg = "Failed to parse the footer stored in cache for file ID " + + file.getFileId() + " " + original + " [ " + Hex.encodeHexString(data) + " ]"; + LOG.error(msg, ex); + return null; + } + return new FileInfo(fs.getModificationTime(), fs.getLen(), fi.getStripes(), + fi.getMetadata(), fi.getFooter(), fi.getFileMetaInfo(), + fi.getFileMetaInfo().writerVersion, file.getFileId()); + } + + @Override + public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader) + throws IOException { + localCache.put(fileId, file, fileMetaInfo, orcReader); + if (fileId != null) { + try { + getHive().putFileMetadata(Lists.newArrayList(fileId), + Lists.newArrayList(((ReaderImpl)orcReader).getSerializedFileFooter())); + } catch (HiveException e) { + throw new IOException(e); + } + } + } + + public void configure(HiveConf queryConfig) { + this.conf = queryConfig; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index cc03df7..8bcda75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -153,7 +153,7 @@ public class OrcSplit extends FileSplit { } } - ReaderImpl.FileMetaInfo getFileMetaInfo(){ + public ReaderImpl.FileMetaInfo getFileMetaInfo() { return fileMetaInfo; } http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 3bac48a..a36027e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -233,8 +233,7 @@ public class ReaderImpl implements Reader { throw new FileFormatException("Malformed ORC file " + path + ". Invalid postscript length " + psLen); } - int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - - len; + int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len; byte[] array = buffer.array(); // now look for the magic string at the end of the postscript. if (!Text.decode(array, offset, len).equals(OrcFile.MAGIC)) { http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8efbb05..3ea8e25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -103,6 +103,7 @@ import org.apache.thrift.TException; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; @@ -241,15 +242,32 @@ public class Hive { * */ public static Hive get(HiveConf c) throws HiveException { + return getInternal(c, false); + } + + /** + * Same as {@link #get(HiveConf)}, except that it checks only the object identity of existing + * MS client, assuming the relevant settings would be unchanged within the same conf object. + */ + public static Hive getWithFastCheck(HiveConf c) throws HiveException { + return getInternal(c, true); + } + + private static Hive getInternal(HiveConf c, boolean isFastCheck) throws HiveException { Hive db = hiveDB.get(); if (db == null || !db.isCurrentUserOwner() || - (db.metaStoreClient != null && !db.metaStoreClient.isCompatibleWith(c))) { + (db.metaStoreClient != null && !isCompatible(db, c, isFastCheck))) { return get(c, true); } db.conf = c; return db; } + private static boolean isCompatible(Hive db, HiveConf c, boolean isFastCheck) { + return isFastCheck + ? db.metaStoreClient.isSameConfObj(c) : db.metaStoreClient.isCompatibleWith(c); + } + /** * get a connection to metastore. see get(HiveConf) function for comments * @@ -3339,4 +3357,28 @@ private void constructOneLBLocationMap(FileStatus fSta, return true; } + public Iterable<Map.Entry<Long, ByteBuffer>> getFileMetadata( + List<Long> fileIds, Configuration conf) throws HiveException { + try { + return getMSC().getFileMetadata(fileIds); + } catch (TException e) { + throw new HiveException(e); + } + } + + public void clearFileMetadata(List<Long> fileIds) throws HiveException { + try { + getMSC().clearFileMetadata(fileIds); + } catch (TException e) { + throw new HiveException(e); + } + } + + public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata) throws HiveException { + try { + getMSC().putFileMetadata(fileIds, metadata); + } catch (TException e) { + throw new HiveException(e); + } + } };
