http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java deleted file mode 100644 index 2464376..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java +++ /dev/null @@ -1,1958 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.BlockStorageLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.VolumeId; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.ColumnDef; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.LiteralExpr; -import com.cloudera.impala.analysis.NullLiteral; -import com.cloudera.impala.analysis.NumericLiteral; -import com.cloudera.impala.analysis.PartitionKeyValue; -import com.cloudera.impala.catalog.HdfsPartition.BlockReplica; -import com.cloudera.impala.catalog.HdfsPartition.FileBlock; -import com.cloudera.impala.catalog.HdfsPartition.FileDescriptor; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.FileSystemUtil; -import com.cloudera.impala.common.Pair; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.thrift.ImpalaInternalServiceConstants; -import com.cloudera.impala.thrift.TAccessLevel; -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TColumn; -import com.cloudera.impala.thrift.THdfsFileBlock; -import com.cloudera.impala.thrift.THdfsPartition; -import com.cloudera.impala.thrift.THdfsPartitionLocation; -import com.cloudera.impala.thrift.THdfsTable; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TPartitionKeyValue; -import com.cloudera.impala.thrift.TResultRow; -import com.cloudera.impala.thrift.TResultSet; -import com.cloudera.impala.thrift.TResultSetMetadata; -import com.cloudera.impala.thrift.TTable; -import com.cloudera.impala.thrift.TTableDescriptor; -import com.cloudera.impala.thrift.TTableType; -import com.cloudera.impala.util.AvroSchemaConverter; -import com.cloudera.impala.util.AvroSchemaParser; -import com.cloudera.impala.util.AvroSchemaUtils; -import com.cloudera.impala.util.FsPermissionChecker; -import com.cloudera.impala.util.HdfsCachingUtil; -import com.cloudera.impala.util.ListMap; -import com.cloudera.impala.util.MetaStoreUtil; -import com.cloudera.impala.util.TAccessLevelUtil; -import com.cloudera.impala.util.TResultRowBuilder; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableMap; - -/** - * Internal representation of table-related metadata of a file-resident table on a - * Hadoop filesystem. The table data can be accessed through libHDFS (which is more of - * an abstraction over Hadoop's FileSystem class rather than DFS specifically). A - * partitioned table can even span multiple filesystems. - * - * This class is not thread-safe. Clients of this class need to protect against - * concurrent updates using external locking (see CatalogOpExecutor class). - * - * Owned by Catalog instance. - * The partition keys constitute the clustering columns. - * - */ -public class HdfsTable extends Table { - // hive's default value for table property 'serialization.null.format' - private static final String DEFAULT_NULL_COLUMN_VALUE = "\\N"; - - // Name of default partition for unpartitioned tables - private static final String DEFAULT_PARTITION_NAME = ""; - - // Number of times to retry fetching the partitions from the HMS should an error occur. - private final static int NUM_PARTITION_FETCH_RETRIES = 5; - - // Table property key for skip.header.line.count - public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count"; - - // An invalid network address, which will always be treated as remote. - private final static TNetworkAddress REMOTE_NETWORK_ADDRESS = - new TNetworkAddress("remote*addr", 0); - - // Minimum block size in bytes allowed for synthetic file blocks (other than the last - // block, which may be shorter). - private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024; - - // string to indicate NULL. set in load() from table properties - private String nullColumnValue_; - - // hive uses this string for NULL partition keys. Set in load(). - private String nullPartitionKeyValue_; - - // Avro schema of this table if this is an Avro table, otherwise null. Set in load(). - private String avroSchema_ = null; - - // Set to true if any of the partitions have Avro data. - private boolean hasAvroData_ = false; - - // True if this table's metadata is marked as cached. Does not necessarily mean the - // data is cached or that all/any partitions are cached. - private boolean isMarkedCached_ = false; - - private static boolean hasLoggedDiskIdFormatWarning_ = false; - - // Array of sorted maps storing the association between partition values and - // partition ids. There is one sorted map per partition key. - // TODO: We should not populate this for HdfsTable objects stored in the catalog - // server. - private ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ = - Lists.newArrayList(); - - // Array of partition id sets that correspond to partitions with null values - // in the partition keys; one set per partition key. - private ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList(); - - // Map of partition ids to HdfsPartitions. - private HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap(); - - // Map of partition name to HdfsPartition object. Used for speeding up - // table metadata loading. - private HashMap<String, HdfsPartition> nameToPartitionMap_ = Maps.newHashMap(); - - // Store all the partition ids of an HdfsTable. - private HashSet<Long> partitionIds_ = Sets.newHashSet(); - - // Maximum size (in bytes) of incremental stats the catalog is allowed to serialize per - // table. This limit is set as a safety check, to prevent the JVM from hitting a - // maximum array limit of 1GB (or OOM) while building the thrift objects to send to - // impalads. - public static final long MAX_INCREMENTAL_STATS_SIZE_BYTES = 200 * 1024 * 1024; - - // Estimate (in bytes) of the incremental stats size per column per partition - public static final long STATS_SIZE_PER_COLUMN_BYTES = 400; - - // Bi-directional map between an integer index and a unique datanode - // TNetworkAddresses, each of which contains blocks of 1 or more - // files in this table. The network addresses are stored using IP - // address as the host name. Each FileBlock specifies a list of - // indices within this hostIndex_ to specify which nodes contain - // replicas of the block. - private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>(); - - private HdfsPartitionLocationCompressor partitionLocationCompressor_; - - // Map of file names to file descriptors for each partition location (directory). - private Map<String, Map<String, FileDescriptor>> - perPartitionFileDescMap_ = Maps.newHashMap(); - - // Total number of Hdfs files in this table. Set in load(). - private long numHdfsFiles_; - - // Sum of sizes of all Hdfs files in this table. Set in load(). - private long totalHdfsBytes_; - - // True iff the table's partitions are located on more than one filesystem. - private boolean multipleFileSystems_ = false; - - // Base Hdfs directory where files of this table are stored. - // For unpartitioned tables it is simply the path where all files live. - // For partitioned tables it is the root directory - // under which partition dirs are placed. - protected String hdfsBaseDir_; - - // List of FieldSchemas that correspond to the non-partition columns. Used when - // describing this table and its partitions to the HMS (e.g. as part of an alter table - // operation), when only non-partition columns are required. - private final List<FieldSchema> nonPartFieldSchemas_ = Lists.newArrayList(); - - // Flag to check if the table schema has been loaded. Used as a precondition - // for setAvroSchema(). - private boolean isSchemaLoaded_ = false; - - private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class); - - // Caching this configuration object makes calls to getFileSystem much quicker - // (saves ~50ms on a standard plan) - // TODO(henry): confirm that this is thread safe - cursory inspection of the class - // and its usage in getFileSystem suggests it should be. - private static final Configuration CONF = new Configuration(); - - private static final boolean SUPPORTS_VOLUME_ID; - - // Wrapper around a FileSystem object to hash based on the underlying FileSystem's - // scheme and authority. - private static class FsKey { - FileSystem filesystem; - - public FsKey(FileSystem fs) { filesystem = fs; } - - @Override - public int hashCode() { return filesystem.getUri().hashCode(); } - - @Override - public boolean equals(Object o) { - if (o == this) return true; - if (o != null && o instanceof FsKey) { - URI uri = filesystem.getUri(); - URI otherUri = ((FsKey)o).filesystem.getUri(); - return uri.equals(otherUri); - } - return false; - } - - @Override - public String toString() { return filesystem.getUri().toString(); } - } - - // Keeps track of newly added THdfsFileBlock metadata and its corresponding - // BlockLocation. For each i, blocks.get(i) corresponds to locations.get(i). Once - // all the new file blocks are collected, the disk volume IDs are retrieved in one - // batched DFS call. - private static class FileBlocksInfo { - final List<THdfsFileBlock> blocks = Lists.newArrayList(); - final List<BlockLocation> locations = Lists.newArrayList(); - - public void addBlocks(List<THdfsFileBlock> b, List<BlockLocation> l) { - Preconditions.checkState(b.size() == l.size()); - blocks.addAll(b); - locations.addAll(l); - } - } - - public HdfsTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl, - Db db, String name, String owner) { - super(id, msTbl, db, name, owner); - partitionLocationCompressor_ = - new HdfsPartitionLocationCompressor(numClusteringCols_); - } - - static { - SUPPORTS_VOLUME_ID = - CONF.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - } - - /** - * Returns a disk id (0-based) index from the Hdfs VolumeId object. - * There is currently no public API to get at the volume id. We'll have to get it - * by accessing the internals. - */ - private static int getDiskId(VolumeId hdfsVolumeId) { - // Initialize the diskId as -1 to indicate it is unknown - int diskId = -1; - - if (hdfsVolumeId != null) { - // TODO: this is a hack and we'll have to address this by getting the - // public API. Also, we need to be very mindful of this when we change - // the version of HDFS. - String volumeIdString = hdfsVolumeId.toString(); - // This is the hacky part. The toString is currently the underlying id - // encoded as hex. - byte[] volumeIdBytes = StringUtils.hexStringToByte(volumeIdString); - if (volumeIdBytes != null && volumeIdBytes.length == 4) { - diskId = Bytes.toInt(volumeIdBytes); - } else if (!hasLoggedDiskIdFormatWarning_) { - LOG.warn("wrong disk id format: " + volumeIdString); - hasLoggedDiskIdFormatWarning_ = true; - } - } - return diskId; - } - - public boolean spansMultipleFileSystems() { return multipleFileSystems_; } - - /** - * Returns true if the table resides at a location which supports caching (e.g. HDFS). - */ - public boolean isLocationCacheable() { - return FileSystemUtil.isPathCacheable(new Path(getLocation())); - } - - /** - * Returns true if the table and all its partitions reside at locations which - * support caching (e.g. HDFS). - */ - public boolean isCacheable() { - if (!isLocationCacheable()) return false; - if (!isMarkedCached() && numClusteringCols_ > 0) { - for (HdfsPartition partition: getPartitions()) { - if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) { - continue; - } - if (!partition.isCacheable()) { - return false; - } - } - } - return true; - } - - /** - * Queries the filesystem to load the file block metadata (e.g. DFS blocks) for the - * given file. Adds the newly created block metadata and block location to the - * perFsFileBlocks, so that the disk IDs for each block can be retrieved with one - * call to DFS. - */ - private void loadBlockMetadata(FileSystem fs, FileStatus file, FileDescriptor fd, - HdfsFileFormat fileFormat, Map<FsKey, FileBlocksInfo> perFsFileBlocks) { - Preconditions.checkNotNull(fd); - Preconditions.checkNotNull(perFsFileBlocks); - Preconditions.checkArgument(!file.isDirectory()); - LOG.debug("load block md for " + name_ + " file " + fd.getFileName()); - - if (!FileSystemUtil.hasGetFileBlockLocations(fs)) { - synthesizeBlockMetadata(fs, fd, fileFormat); - return; - } - try { - BlockLocation[] locations = fs.getFileBlockLocations(file, 0, file.getLen()); - Preconditions.checkNotNull(locations); - - // Loop over all blocks in the file. - for (BlockLocation loc: locations) { - Preconditions.checkNotNull(loc); - // Get the location of all block replicas in ip:port format. - String[] blockHostPorts = loc.getNames(); - // Get the hostnames for all block replicas. Used to resolve which hosts - // contain cached data. The results are returned in the same order as - // block.getNames() so it allows us to match a host specified as ip:port to - // corresponding hostname using the same array index. - String[] blockHostNames = loc.getHosts(); - Preconditions.checkState(blockHostNames.length == blockHostPorts.length); - // Get the hostnames that contain cached replicas of this block. - Set<String> cachedHosts = - Sets.newHashSet(Arrays.asList(loc.getCachedHosts())); - Preconditions.checkState(cachedHosts.size() <= blockHostNames.length); - - // Now enumerate all replicas of the block, adding any unknown hosts - // to hostMap_/hostList_. The host ID (index in to the hostList_) for each - // replica is stored in replicaHostIdxs. - List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize( - blockHostPorts.length); - for (int i = 0; i < blockHostPorts.length; ++i) { - TNetworkAddress networkAddress = BlockReplica.parseLocation(blockHostPorts[i]); - Preconditions.checkState(networkAddress != null); - replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress), - cachedHosts.contains(blockHostNames[i]))); - } - fd.addFileBlock(new FileBlock(loc.getOffset(), loc.getLength(), replicas)); - } - // Remember the THdfsFileBlocks and corresponding BlockLocations. Once all the - // blocks are collected, the disk IDs will be queried in one batch per filesystem. - addPerFsFileBlocks(perFsFileBlocks, fs, fd.getFileBlocks(), - Arrays.asList(locations)); - } catch (IOException e) { - throw new RuntimeException("couldn't determine block locations for path '" + - file.getPath() + "':\n" + e.getMessage(), e); - } - } - - /** - * For filesystems that don't override getFileBlockLocations, synthesize file blocks - * by manually splitting the file range into fixed-size blocks. That way, scan - * ranges can be derived from file blocks as usual. All synthesized blocks are given - * an invalid network address so that the scheduler will treat them as remote. - */ - private void synthesizeBlockMetadata(FileSystem fs, FileDescriptor fd, - HdfsFileFormat fileFormat) { - long start = 0; - long remaining = fd.getFileLength(); - // Workaround HADOOP-11584 by using the filesystem default block size rather than - // the block size from the FileStatus. - // TODO: after HADOOP-11584 is resolved, get the block size from the FileStatus. - long blockSize = fs.getDefaultBlockSize(); - if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE; - if (!fileFormat.isSplittable(HdfsCompression.fromFileName(fd.getFileName()))) { - blockSize = remaining; - } - while (remaining > 0) { - long len = Math.min(remaining, blockSize); - List<BlockReplica> replicas = Lists.newArrayList( - new BlockReplica(hostIndex_.getIndex(REMOTE_NETWORK_ADDRESS), false)); - fd.addFileBlock(new FileBlock(start, len, replicas)); - remaining -= len; - start += len; - } - } - - /** - * Populates disk/volume ID metadata inside the newly created THdfsFileBlocks. - * perFsFileBlocks maps from each filesystem to a FileBLocksInfo. The first list - * contains the newly created THdfsFileBlocks and the second contains the - * corresponding BlockLocations. - */ - private void loadDiskIds(Map<FsKey, FileBlocksInfo> perFsFileBlocks) { - if (!SUPPORTS_VOLUME_ID) return; - // Loop over each filesystem. If the filesystem is DFS, retrieve the volume IDs - // for all the blocks. - for (FsKey fsKey: perFsFileBlocks.keySet()) { - FileSystem fs = fsKey.filesystem; - // Only DistributedFileSystem has getFileBlockStorageLocations(). It's not even - // part of the FileSystem interface, so we'll need to downcast. - if (!(fs instanceof DistributedFileSystem)) continue; - - LOG.trace("Loading disk ids for: " + getFullName() + ". nodes: " + - hostIndex_.size() + ". filesystem: " + fsKey); - DistributedFileSystem dfs = (DistributedFileSystem)fs; - FileBlocksInfo blockLists = perFsFileBlocks.get(fsKey); - Preconditions.checkNotNull(blockLists); - BlockStorageLocation[] storageLocs = null; - try { - // Get the BlockStorageLocations for all the blocks - storageLocs = dfs.getFileBlockStorageLocations(blockLists.locations); - } catch (IOException e) { - LOG.error("Couldn't determine block storage locations for filesystem " + - fs + ":\n" + e.getMessage()); - continue; - } - if (storageLocs == null || storageLocs.length == 0) { - LOG.warn("Attempted to get block locations for filesystem " + fs + - " but the call returned no results"); - continue; - } - if (storageLocs.length != blockLists.locations.size()) { - // Block locations and storage locations didn't match up. - LOG.error("Number of block storage locations not equal to number of blocks: " - + "#storage locations=" + Long.toString(storageLocs.length) - + " #blocks=" + Long.toString(blockLists.locations.size())); - continue; - } - long unknownDiskIdCount = 0; - // Attach volume IDs given by the storage location to the corresponding - // THdfsFileBlocks. - for (int locIdx = 0; locIdx < storageLocs.length; ++locIdx) { - VolumeId[] volumeIds = storageLocs[locIdx].getVolumeIds(); - THdfsFileBlock block = blockLists.blocks.get(locIdx); - // Convert opaque VolumeId to 0 based ids. - // TODO: the diskId should be eventually retrievable from Hdfs when the - // community agrees this API is useful. - int[] diskIds = new int[volumeIds.length]; - for (int i = 0; i < volumeIds.length; ++i) { - diskIds[i] = getDiskId(volumeIds[i]); - if (diskIds[i] < 0) ++unknownDiskIdCount; - } - FileBlock.setDiskIds(diskIds, block); - } - if (unknownDiskIdCount > 0) { - LOG.warn("Unknown disk id count for filesystem " + fs + ":" + unknownDiskIdCount); - } - } - } - - @Override - public TCatalogObjectType getCatalogObjectType() { - return TCatalogObjectType.TABLE; - } - public boolean isMarkedCached() { return isMarkedCached_; } - - public Collection<HdfsPartition> getPartitions() { return partitionMap_.values(); } - public Map<Long, HdfsPartition> getPartitionMap() { return partitionMap_; } - public Map<String, HdfsPartition> getNameToPartitionMap() { - return nameToPartitionMap_; - } - public Set<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); } - public HdfsPartitionLocationCompressor getPartitionLocationCompressor() { - return partitionLocationCompressor_; - } - public Set<Long> getPartitionIds() { return partitionIds_; } - public TreeMap<LiteralExpr, HashSet<Long>> getPartitionValueMap(int i) { - return partitionValuesMap_.get(i); - } - - /** - * Returns the value Hive is configured to use for NULL partition key values. - * Set during load. - */ - public String getNullPartitionKeyValue() { return nullPartitionKeyValue_; } - public String getNullColumnValue() { return nullColumnValue_; } - - /* - * Returns the storage location (HDFS path) of this table. - */ - public String getLocation() { - return super.getMetaStoreTable().getSd().getLocation(); - } - - List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; } - - // True if Impala has HDFS write permissions on the hdfsBaseDir (for an unpartitioned - // table) or if Impala has write permissions on all partition directories (for - // a partitioned table). - public boolean hasWriteAccess() { - return TAccessLevelUtil.impliesWriteAccess(accessLevel_); - } - - /** - * Returns the first location (HDFS path) that Impala does not have WRITE access - * to, or an null if none is found. For an unpartitioned table, this just - * checks the hdfsBaseDir. For a partitioned table it checks all partition directories. - */ - public String getFirstLocationWithoutWriteAccess() { - if (getMetaStoreTable() == null) return null; - - if (getMetaStoreTable().getPartitionKeysSize() == 0) { - if (!TAccessLevelUtil.impliesWriteAccess(accessLevel_)) { - return hdfsBaseDir_; - } - } else { - for (HdfsPartition partition: partitionMap_.values()) { - if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) { - return partition.getLocation(); - } - } - } - return null; - } - - /** - * Gets the HdfsPartition matching the given partition spec. Returns null if no match - * was found. - */ - public HdfsPartition getPartition( - List<PartitionKeyValue> partitionSpec) { - List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList(); - for (PartitionKeyValue kv: partitionSpec) { - String value = PartitionKeyValue.getPartitionKeyValueString( - kv.getLiteralValue(), getNullPartitionKeyValue()); - partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value)); - } - return getPartitionFromThriftPartitionSpec(partitionKeyValues); - } - - /** - * Gets the HdfsPartition matching the Thrift version of the partition spec. - * Returns null if no match was found. - */ - public HdfsPartition getPartitionFromThriftPartitionSpec( - List<TPartitionKeyValue> partitionSpec) { - // First, build a list of the partition values to search for in the same order they - // are defined in the table. - List<String> targetValues = Lists.newArrayList(); - Set<String> keys = Sets.newHashSet(); - for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) { - for (TPartitionKeyValue kv: partitionSpec) { - if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) { - targetValues.add(kv.getValue().toLowerCase()); - // Same key was specified twice - if (!keys.add(kv.getName().toLowerCase())) { - return null; - } - } - } - } - - // Make sure the number of values match up and that some values were found. - if (targetValues.size() == 0 || - (targetValues.size() != getMetaStoreTable().getPartitionKeysSize())) { - return null; - } - - // Search through all the partitions and check if their partition key values - // match the values being searched for. - for (HdfsPartition partition: partitionMap_.values()) { - if (partition.isDefaultPartition()) continue; - List<LiteralExpr> partitionValues = partition.getPartitionValues(); - Preconditions.checkState(partitionValues.size() == targetValues.size()); - boolean matchFound = true; - for (int i = 0; i < targetValues.size(); ++i) { - String value; - if (partitionValues.get(i) instanceof NullLiteral) { - value = getNullPartitionKeyValue(); - } else { - value = partitionValues.get(i).getStringValue(); - Preconditions.checkNotNull(value); - // See IMPALA-252: we deliberately map empty strings on to - // NULL when they're in partition columns. This is for - // backwards compatibility with Hive, and is clearly broken. - if (value.isEmpty()) value = getNullPartitionKeyValue(); - } - if (!targetValues.get(i).equals(value.toLowerCase())) { - matchFound = false; - break; - } - } - if (matchFound) { - return partition; - } - } - return null; - } - - /** - * Create columns corresponding to fieldSchemas. Throws a TableLoadingException if the - * metadata is incompatible with what we support. - */ - private void addColumnsFromFieldSchemas(List<FieldSchema> fieldSchemas) - throws TableLoadingException { - int pos = colsByPos_.size(); - for (FieldSchema s: fieldSchemas) { - Type type = parseColumnType(s); - // Check if we support partitioning on columns of such a type. - if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) { - throw new TableLoadingException( - String.format("Failed to load metadata for table '%s' because of " + - "unsupported partition-column type '%s' in partition column '%s'", - getFullName(), type.toString(), s.getName())); - } - - Column col = new Column(s.getName(), type, s.getComment(), pos); - addColumn(col); - ++pos; - } - } - - /** - * Clear the partitions of an HdfsTable and the associated metadata. - */ - private void resetPartitions() { - partitionIds_.clear(); - partitionMap_.clear(); - nameToPartitionMap_.clear(); - partitionValuesMap_.clear(); - nullPartitionIds_.clear(); - perPartitionFileDescMap_.clear(); - // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats. - for (int i = 0; i < numClusteringCols_; ++i) { - getColumns().get(i).getStats().setNumNulls(0); - getColumns().get(i).getStats().setNumDistinctValues(0); - partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap()); - nullPartitionIds_.add(Sets.<Long>newHashSet()); - } - numHdfsFiles_ = 0; - totalHdfsBytes_ = 0; - } - - /** - * Resets any partition metadata, creates the default partition and sets the base - * table directory path as well as the caching info from the HMS table. - */ - private void initializePartitionMetadata( - org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException { - Preconditions.checkNotNull(msTbl); - resetPartitions(); - hdfsBaseDir_ = msTbl.getSd().getLocation(); - // INSERT statements need to refer to this if they try to write to new partitions - // Scans don't refer to this because by definition all partitions they refer to - // exist. - addDefaultPartition(msTbl.getSd()); - - // We silently ignore cache directives that no longer exist in HDFS, and remove - // non-existing cache directives from the parameters. - isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters()); - } - - /** - * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this - * table's partition list. Any partition metadata will be reset and loaded from - * scratch. - * - * If there are no partitions in the Hive metadata, a single partition is added with no - * partition keys. - */ - private void loadAllPartitions( - List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException, - CatalogException { - Preconditions.checkNotNull(msTbl); - initializePartitionMetadata(msTbl); - // Map of filesystem to the file blocks for new/modified FileDescriptors. Blocks in - // this map will have their disk volume IDs information (re)loaded. This is used to - // speed up the incremental refresh of a table's metadata by skipping unmodified, - // previously loaded blocks. - Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap(); - if (msTbl.getPartitionKeysSize() == 0) { - Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty()); - // This table has no partition key, which means it has no declared partitions. - // We model partitions slightly differently to Hive - every file must exist in a - // partition, so add a single partition with no keys which will get all the - // files in the table's root directory. - HdfsPartition part = createPartition(msTbl.getSd(), null, blocksToLoad); - if (isMarkedCached_) part.markCached(); - addPartition(part); - Path location = new Path(hdfsBaseDir_); - FileSystem fs = location.getFileSystem(CONF); - if (fs.exists(location)) { - accessLevel_ = getAvailableAccessLevel(fs, location); - } - } else { - for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { - HdfsPartition partition = createPartition(msPartition.getSd(), msPartition, - blocksToLoad); - addPartition(partition); - // If the partition is null, its HDFS path does not exist, and it was not added - // to this table's partition list. Skip the partition. - if (partition == null) continue; - if (msPartition.getParameters() != null) { - partition.setNumRows(getRowCount(msPartition.getParameters())); - } - if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) { - // TODO: READ_ONLY isn't exactly correct because the it's possible the - // partition does not have READ permissions either. When we start checking - // whether we can READ from a table, this should be updated to set the - // table's access level to the "lowest" effective level across all - // partitions. That is, if one partition has READ_ONLY and another has - // WRITE_ONLY the table's access level should be NONE. - accessLevel_ = TAccessLevel.READ_ONLY; - } - } - } - loadDiskIds(blocksToLoad); - } - - /** - * Gets the AccessLevel that is available for Impala for this table based on the - * permissions Impala has on the given path. If the path does not exist, recurses up - * the path until a existing parent directory is found, and inherit access permissions - * from that. - */ - private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location) - throws IOException { - FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance(); - while (location != null) { - if (fs.exists(location)) { - FsPermissionChecker.Permissions perms = - permissionChecker.getPermissions(fs, location); - if (perms.canReadAndWrite()) { - return TAccessLevel.READ_WRITE; - } else if (perms.canRead()) { - return TAccessLevel.READ_ONLY; - } else if (perms.canWrite()) { - return TAccessLevel.WRITE_ONLY; - } - return TAccessLevel.NONE; - } - location = location.getParent(); - } - // Should never get here. - Preconditions.checkNotNull(location, "Error: no path ancestor exists"); - return TAccessLevel.NONE; - } - - /** - * Creates a new HdfsPartition object to be added to HdfsTable's partition list. - * Partitions may be empty, or may not even exist in the filesystem (a partition's - * location may have been changed to a new path that is about to be created by an - * INSERT). Also loads the block metadata for this partition. Returns new partition - * if successful or null if none was created. - * - * Throws CatalogException if the supplied storage descriptor contains metadata that - * Impala can't understand. - */ - public HdfsPartition createPartition(StorageDescriptor storageDescriptor, - org.apache.hadoop.hive.metastore.api.Partition msPartition) - throws CatalogException { - Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap(); - HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition, - blocksToLoad); - loadDiskIds(blocksToLoad); - return hdfsPartition; - } - - /** - * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition - * object. It populates 'perFsFileBlock' with the blocks to be loaded for each file in - * the partition directory. - */ - private HdfsPartition createPartition(StorageDescriptor storageDescriptor, - org.apache.hadoop.hive.metastore.api.Partition msPartition, - Map<FsKey, FileBlocksInfo> perFsFileBlocks) - throws CatalogException { - HdfsStorageDescriptor fileFormatDescriptor = - HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor); - List<LiteralExpr> keyValues = Lists.newArrayList(); - boolean isMarkedCached = isMarkedCached_; - if (msPartition != null) { - isMarkedCached = HdfsCachingUtil.validateCacheParams(msPartition.getParameters()); - // Load key values - for (String partitionKey: msPartition.getValues()) { - Type type = getColumns().get(keyValues.size()).getType(); - // Deal with Hive's special NULL partition key. - if (partitionKey.equals(nullPartitionKeyValue_)) { - keyValues.add(NullLiteral.create(type)); - } else { - try { - keyValues.add(LiteralExpr.create(partitionKey, type)); - } catch (Exception ex) { - LOG.warn("Failed to create literal expression of type: " + type, ex); - throw new CatalogException("Invalid partition key value of type: " + type, - ex); - } - } - } - try { - Expr.analyze(keyValues, null); - } catch (AnalysisException e) { - // should never happen - throw new IllegalStateException(e); - } - } - - Path partDirPath = new Path(storageDescriptor.getLocation()); - try { - FileSystem fs = partDirPath.getFileSystem(CONF); - multipleFileSystems_ = multipleFileSystems_ || - !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs); - updatePartitionFds(partDirPath, isMarkedCached, - fileFormatDescriptor.getFileFormat(), perFsFileBlocks); - HdfsPartition partition = - new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor, - perPartitionFileDescMap_.get(partDirPath.toString()).values(), - getAvailableAccessLevel(fs, partDirPath)); - partition.checkWellFormed(); - return partition; - } catch (IOException e) { - throw new CatalogException("Error initializing partition", e); - } - } - - /** - * Add the given THdfsFileBlocks and BlockLocations to the FileBlockInfo for the - * given filesystem. - */ - private void addPerFsFileBlocks(Map<FsKey, FileBlocksInfo> fsToBlocks, FileSystem fs, - List<THdfsFileBlock> blocks, List<BlockLocation> locations) { - FsKey fsKey = new FsKey(fs); - FileBlocksInfo infos = fsToBlocks.get(fsKey); - if (infos == null) { - infos = new FileBlocksInfo(); - fsToBlocks.put(fsKey, infos); - } - infos.addBlocks(blocks, locations); - } - - /** - * Adds the partition to the HdfsTable. Throws a CatalogException if the partition - * already exists in this table. - */ - public void addPartition(HdfsPartition partition) throws CatalogException { - if (partitionMap_.containsKey(partition.getId())) { - throw new CatalogException(String.format("Partition %s already exists in table %s", - partition.getPartitionName(), getFullName())); - } - partitionMap_.put(partition.getId(), partition); - totalHdfsBytes_ += partition.getSize(); - numHdfsFiles_ += partition.getNumFileDescriptors(); - updatePartitionMdAndColStats(partition); - } - - /** - * Updates the HdfsTable's partition metadata, i.e. adds the id to the HdfsTable and - * populates structures used for speeding up partition pruning/lookup. Also updates - * column stats. - */ - private void updatePartitionMdAndColStats(HdfsPartition partition) { - if (partition.getPartitionValues().size() != numClusteringCols_) return; - partitionIds_.add(partition.getId()); - for (int i = 0; i < partition.getPartitionValues().size(); ++i) { - ColumnStats stats = getColumns().get(i).getStats(); - LiteralExpr literal = partition.getPartitionValues().get(i); - // Store partitions with null partition values separately - if (literal instanceof NullLiteral) { - stats.setNumNulls(stats.getNumNulls() + 1); - if (nullPartitionIds_.get(i).isEmpty()) { - stats.setNumDistinctValues(stats.getNumDistinctValues() + 1); - } - nullPartitionIds_.get(i).add(partition.getId()); - continue; - } - HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal); - if (partitionIds == null) { - partitionIds = Sets.newHashSet(); - partitionValuesMap_.get(i).put(literal, partitionIds); - stats.setNumDistinctValues(stats.getNumDistinctValues() + 1); - } - partitionIds.add(partition.getId()); - } - nameToPartitionMap_.put(partition.getPartitionName(), partition); - } - - /** - * Drops the partition having the given partition spec from HdfsTable. Cleans up its - * metadata from all the mappings used to speed up partition pruning/lookup. - * Also updates partition column statistics. Given partitionSpec must match exactly - * one partition. - * Returns the HdfsPartition that was dropped. If the partition does not exist, returns - * null. - */ - public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) { - return dropPartition(getPartitionFromThriftPartitionSpec(partitionSpec)); - } - - /** - * Drops a partition and updates partition column statistics. Returns the - * HdfsPartition that was dropped or null if the partition does not exist. - */ - private HdfsPartition dropPartition(HdfsPartition partition) { - if (partition == null) return null; - totalHdfsBytes_ -= partition.getSize(); - numHdfsFiles_ -= partition.getNumFileDescriptors(); - Preconditions.checkArgument(partition.getPartitionValues().size() == - numClusteringCols_); - Long partitionId = partition.getId(); - // Remove the partition id from the list of partition ids and other mappings. - partitionIds_.remove(partitionId); - partitionMap_.remove(partitionId); - nameToPartitionMap_.remove(partition.getPartitionName()); - perPartitionFileDescMap_.remove(partition.getLocation()); - for (int i = 0; i < partition.getPartitionValues().size(); ++i) { - ColumnStats stats = getColumns().get(i).getStats(); - LiteralExpr literal = partition.getPartitionValues().get(i); - // Check if this is a null literal. - if (literal instanceof NullLiteral) { - nullPartitionIds_.get(i).remove(partitionId); - stats.setNumNulls(stats.getNumNulls() - 1); - if (nullPartitionIds_.get(i).isEmpty()) { - stats.setNumDistinctValues(stats.getNumDistinctValues() - 1); - } - continue; - } - HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal); - // If there are multiple partition ids corresponding to a literal, remove - // only this id. Otherwise, remove the <literal, id> pair. - if (partitionIds.size() > 1) partitionIds.remove(partitionId); - else { - partitionValuesMap_.get(i).remove(literal); - stats.setNumDistinctValues(stats.getNumDistinctValues() - 1); - } - } - return partition; - } - - private void addDefaultPartition(StorageDescriptor storageDescriptor) - throws CatalogException { - // Default partition has no files and is not referred to by scan nodes. Data sinks - // refer to this to understand how to create new partitions. - HdfsStorageDescriptor hdfsStorageDescriptor = - HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor); - HdfsPartition partition = HdfsPartition.defaultPartition(this, - hdfsStorageDescriptor); - partitionMap_.put(partition.getId(), partition); - } - - @Override - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { - load(reuseMetadata, client, msTbl, true, true, null); - } - - /** - * Loads table metadata from the Hive Metastore. - * - * If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore, - * including partition and file metadata. Otherwise, loads metadata incrementally and - * updates this HdfsTable in place so that it is in sync with the Hive Metastore. - * - * Depending on the operation that triggered the table metadata load, not all the - * metadata may need to be updated. If 'partitionsToUpdate' is not null, it specifies a - * list of partitions for which metadata should be updated. Otherwise, all partition - * metadata will be updated from the Hive Metastore. - * - * If 'loadFileMetadata' is true, file metadata of the specified partitions are - * reloaded while reusing existing file descriptors to avoid loading metadata for files - * that haven't changed. If 'partitionsToUpdate' is not specified, file metadata of all - * the partitions are loaded. - * - * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore. - * - * There are several cases where existing file descriptors might be reused incorrectly: - * 1. an ALTER TABLE ADD PARTITION or dynamic partition insert is executed through - * Hive. This does not update the lastDdlTime. - * 2. Hdfs rebalancer is executed. This changes the block locations but doesn't update - * the mtime (file modification time). - * If any of these occur, user has to execute "invalidate metadata" to invalidate the - * metadata cache of the table and trigger a fresh load. - */ - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl, boolean loadFileMetadata, - boolean loadTableSchema, Set<String> partitionsToUpdate) - throws TableLoadingException { - // turn all exceptions into TableLoadingException - msTable_ = msTbl; - try { - if (loadTableSchema) loadSchema(client, msTbl); - if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) { - // This is the special case of CTAS that creates a 'temp' table that does not - // actually exist in the Hive Metastore. - initializePartitionMetadata(msTbl); - updateStatsFromHmsTable(msTbl); - return; - } - // Load partition and file metadata - if (!reuseMetadata) { - // Load all partitions from Hive Metastore, including file metadata. - LOG.debug("load table from Hive Metastore: " + db_.getName() + "." + name_); - List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions = - Lists.newArrayList(); - msPartitions.addAll(MetaStoreUtil.fetchAllPartitions( - client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES)); - loadAllPartitions(msPartitions, msTbl); - } else { - // Incrementally update this table's partitions and file metadata - LOG.debug("incremental update for table: " + db_.getName() + "." + name_); - Preconditions.checkState(partitionsToUpdate == null || loadFileMetadata); - updateMdFromHmsTable(msTbl); - if (msTbl.getPartitionKeysSize() == 0) { - if (loadFileMetadata) updateUnpartitionedTableFileMd(); - } else { - updatePartitionsFromHms(client, partitionsToUpdate, loadFileMetadata); - } - } - if (loadTableSchema) setAvroSchema(client, msTbl); - updateStatsFromHmsTable(msTbl); - } catch (TableLoadingException e) { - throw e; - } catch (Exception e) { - throw new TableLoadingException("Failed to load metadata for table: " + name_, e); - } - } - - /** - * Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_', - * and 'accessLevel_' from 'msTbl'. Throws an IOException if there was an error - * accessing the table location path. - */ - private void updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl) - throws IOException { - Preconditions.checkNotNull(msTbl); - hdfsBaseDir_ = msTbl.getSd().getLocation(); - isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters()); - if (msTbl.getPartitionKeysSize() == 0) { - Path location = new Path(hdfsBaseDir_); - FileSystem fs = location.getFileSystem(CONF); - if (fs.exists(location)) { - accessLevel_ = getAvailableAccessLevel(fs, location); - } - } - setMetaStoreTable(msTbl); - } - - /** - * Updates the file metadata of an unpartitioned HdfsTable. - */ - private void updateUnpartitionedTableFileMd() throws CatalogException { - LOG.debug("update unpartitioned table: " + name_); - resetPartitions(); - org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); - Preconditions.checkNotNull(msTbl); - addDefaultPartition(msTbl.getSd()); - Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap(); - HdfsPartition part = createPartition(msTbl.getSd(), null, fileBlocksToLoad); - addPartition(part); - loadDiskIds(fileBlocksToLoad); - if (isMarkedCached_) part.markCached(); - } - - /** - * Updates the partitions of an HdfsTable so that they are in sync with the Hive - * Metastore. It reloads partitions that were marked 'dirty' by doing a DROP + CREATE. - * It removes from this table partitions that no longer exist in the Hive Metastore and - * adds partitions that were added externally (e.g. using Hive) to the Hive Metastore - * but do not exist in this table. If 'loadFileMetadata' is true, it triggers - * file/block metadata reload for the partitions specified in 'partitionsToUpdate', if - * any, or for all the table partitions if 'partitionsToUpdate' is null. - */ - private void updatePartitionsFromHms(IMetaStoreClient client, - Set<String> partitionsToUpdate, boolean loadFileMetadata) throws Exception { - LOG.debug("sync table partitions: " + name_); - org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); - Preconditions.checkNotNull(msTbl); - Preconditions.checkState(msTbl.getPartitionKeysSize() != 0); - Preconditions.checkState(loadFileMetadata || partitionsToUpdate == null); - - // Retrieve all the partition names from the Hive Metastore. We need this to - // identify the delta between partitions of the local HdfsTable and the table entry - // in the Hive Metastore. Note: This is a relatively "cheap" operation - // (~.3 secs for 30K partitions). - Set<String> msPartitionNames = Sets.newHashSet(); - msPartitionNames.addAll( - client.listPartitionNames(db_.getName(), name_, (short) -1)); - // Names of loaded partitions in this table - Set<String> partitionNames = Sets.newHashSet(); - // Partitions for which file metadata must be loaded - List<HdfsPartition> partitionsToUpdateFileMd = Lists.newArrayList(); - // Partitions that need to be dropped and recreated from scratch - List<HdfsPartition> dirtyPartitions = Lists.newArrayList(); - // Partitions that need to be removed from this table. That includes dirty - // partitions as well as partitions that were removed from the Hive Metastore. - List<HdfsPartition> partitionsToRemove = Lists.newArrayList(); - // Identify dirty partitions that need to be loaded from the Hive Metastore and - // partitions that no longer exist in the Hive Metastore. - for (HdfsPartition partition: partitionMap_.values()) { - // Ignore the default partition - if (partition.isDefaultPartition()) continue; - // Remove partitions that don't exist in the Hive Metastore. These are partitions - // that were removed from HMS using some external process, e.g. Hive. - if (!msPartitionNames.contains(partition.getPartitionName())) { - partitionsToRemove.add(partition); - } - if (partition.isDirty()) { - // Dirty partitions are updated by removing them from table's partition - // list and loading them from the Hive Metastore. - dirtyPartitions.add(partition); - } else { - if (partitionsToUpdate == null && loadFileMetadata) { - partitionsToUpdateFileMd.add(partition); - } - } - Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor()); - partitionNames.add(partition.getPartitionName()); - } - partitionsToRemove.addAll(dirtyPartitions); - for (HdfsPartition partition: partitionsToRemove) dropPartition(partition); - // Load dirty partitions from Hive Metastore - loadPartitionsFromMetastore(dirtyPartitions, client); - - // Identify and load partitions that were added in the Hive Metastore but don't - // exist in this table. - Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames); - loadPartitionsFromMetastore(newPartitionsInHms, client); - // If a list of modified partitions (old and new) is specified, don't reload file - // metadata for the new ones as they have already been detected in HMS and have been - // reloaded by loadPartitionsFromMetastore(). - if (partitionsToUpdate != null) { - partitionsToUpdate.removeAll(newPartitionsInHms); - } - - // Load file metadata. Until we have a notification mechanism for when a - // file changes in hdfs, it is sometimes required to reload all the file - // descriptors and block metadata of a table (e.g. REFRESH statement). - if (loadFileMetadata) { - if (partitionsToUpdate != null) { - // Only reload file metadata of partitions specified in 'partitionsToUpdate' - Preconditions.checkState(partitionsToUpdateFileMd.isEmpty()); - partitionsToUpdateFileMd = getPartitionsByName(partitionsToUpdate); - } - loadPartitionFileMetadata(partitionsToUpdateFileMd); - } - } - - /** - * Returns the HdfsPartition objects associated with the specified list of partition - * names. - */ - private List<HdfsPartition> getPartitionsByName(Collection<String> partitionNames) { - List<HdfsPartition> partitions = Lists.newArrayList(); - for (String partitionName: partitionNames) { - String partName = DEFAULT_PARTITION_NAME; - if (partitionName.length() > 0) { - // Trim the last trailing char '/' from each partition name - partName = partitionName.substring(0, partitionName.length()-1); - } - Preconditions.checkState(nameToPartitionMap_.containsKey(partName), - "Invalid partition name: " + partName); - partitions.add(nameToPartitionMap_.get(partName)); - } - return partitions; - } - - /** - * Updates the cardinality of this table from an HMS table. Sets the cardinalities of - * dummy/default partitions for the case of unpartitioned tables. - */ - private void updateStatsFromHmsTable( - org.apache.hadoop.hive.metastore.api.Table msTbl) { - numRows_ = getRowCount(msTbl.getParameters()); - // For unpartitioned tables set the numRows in its partitions - // to the table's numRows. - if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) { - // Unpartitioned tables have a 'dummy' partition and a default partition. - // Temp tables used in CTAS statements have one partition. - Preconditions.checkState(partitionMap_.size() == 2 || partitionMap_.size() == 1); - for (HdfsPartition p: partitionMap_.values()) { - p.setNumRows(numRows_); - } - } - } - - /** - * Returns whether the table has the 'skip.header.line.count' property set. - */ - private boolean hasSkipHeaderLineCount() { - String key = TBL_PROP_SKIP_HEADER_LINE_COUNT; - org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); - if (msTbl == null) return false; - String inputFormat = msTbl.getSd().getInputFormat(); - return msTbl.getParameters().containsKey(key); - } - - /** - * Parses and returns the value of the 'skip.header.line.count' table property. If the - * value is not set for the table, returns 0. If parsing fails or a value < 0 is found, - * the error parameter is updated to contain an error message. - */ - public int parseSkipHeaderLineCount(StringBuilder error) { - if (!hasSkipHeaderLineCount()) return 0; - return parseSkipHeaderLineCount(getMetaStoreTable().getParameters(), error); - } - - /** - * Parses and returns the value of the 'skip.header.line.count' table property. The - * caller must ensure that the property is contained in the 'tblProperties' map. If - * parsing fails or a value < 0 is found, the error parameter is updated to contain an - * error message. - */ - public static int parseSkipHeaderLineCount(Map<String, String> tblProperties, - StringBuilder error) { - Preconditions.checkState(tblProperties != null); - String key = TBL_PROP_SKIP_HEADER_LINE_COUNT; - Preconditions.checkState(tblProperties.containsKey(key)); - // Try to parse. - String string_value = tblProperties.get(key); - int skipHeaderLineCount = 0; - String error_msg = String.format("Invalid value for table property %s: %s (value " + - "must be an integer >= 0)", key, string_value); - try { - skipHeaderLineCount = Integer.parseInt(string_value); - } catch (NumberFormatException exc) { - error.append(error_msg); - } - if (skipHeaderLineCount < 0) error.append(error_msg); - return skipHeaderLineCount; - } - - /** - * Sets avroSchema_ if the table or any of the partitions in the table are stored - * as Avro. Additionally, this method also reconciles the schema if the column - * definitions from the metastore differ from the Avro schema. - */ - private void setAvroSchema(IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { - Preconditions.checkState(isSchemaLoaded_); - String inputFormat = msTbl.getSd().getInputFormat(); - if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO - || hasAvroData_) { - // Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter - // taking precedence. - List<Map<String, String>> schemaSearchLocations = Lists.newArrayList(); - schemaSearchLocations.add( - getMetaStoreTable().getSd().getSerdeInfo().getParameters()); - schemaSearchLocations.add(getMetaStoreTable().getParameters()); - - avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations); - - if (avroSchema_ == null) { - // No Avro schema was explicitly set in the table metadata, so infer the Avro - // schema from the column definitions. - Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas( - msTbl.getSd().getCols(), getFullName()); - avroSchema_ = inferredSchema.toString(); - } - String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib(); - if (serdeLib == null || - serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) { - // If the SerDe library is null or set to LazySimpleSerDe or is null, it - // indicates there is an issue with the table metadata since Avro table need a - // non-native serde. Instead of failing to load the table, fall back to - // using the fields from the storage descriptor (same as Hive). - return; - } else { - // Generate new FieldSchemas from the Avro schema. This step reconciles - // differences in the column definitions and the Avro schema. For - // Impala-created tables this step is not necessary because the same - // resolution is done during table creation. But Hive-created tables - // store the original column definitions, and not the reconciled ones. - List<ColumnDef> colDefs = - ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols()); - List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema_); - StringBuilder warning = new StringBuilder(); - List<ColumnDef> reconciledColDefs = - AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning); - if (warning.length() != 0) { - LOG.warn(String.format("Warning while loading table %s:\n%s", - getFullName(), warning.toString())); - } - AvroSchemaUtils.setFromSerdeComment(reconciledColDefs); - // Reset and update nonPartFieldSchemas_ to the reconcicled colDefs. - nonPartFieldSchemas_.clear(); - nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs)); - // Update the columns as per the reconciled colDefs and re-load stats. - clearColumns(); - addColumnsFromFieldSchemas(msTbl.getPartitionKeys()); - addColumnsFromFieldSchemas(nonPartFieldSchemas_); - loadAllColumnStats(client); - } - } - } - - /** - * Loads table schema and column stats from Hive Metastore. - */ - private void loadSchema(IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { - nonPartFieldSchemas_.clear(); - // set nullPartitionKeyValue from the hive conf. - nullPartitionKeyValue_ = client.getConfigValue( - "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"); - - // set NULL indicator string from table properties - nullColumnValue_ = - msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT); - if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE; - - // Excludes partition columns. - nonPartFieldSchemas_.addAll(msTbl.getSd().getCols()); - - // The number of clustering columns is the number of partition keys. - numClusteringCols_ = msTbl.getPartitionKeys().size(); - partitionLocationCompressor_.setClusteringColumns(numClusteringCols_); - clearColumns(); - // Add all columns to the table. Ordering is important: partition columns first, - // then all other columns. - addColumnsFromFieldSchemas(msTbl.getPartitionKeys()); - addColumnsFromFieldSchemas(nonPartFieldSchemas_); - loadAllColumnStats(client); - isSchemaLoaded_ = true; - } - - /** - * Loads partitions from the Hive Metastore and adds them to the internal list of - * table partitions. - */ - private void loadPartitionsFromMetastore(List<HdfsPartition> partitions, - IMetaStoreClient client) throws Exception { - Preconditions.checkNotNull(partitions); - if (partitions.isEmpty()) return; - LOG.info(String.format("Incrementally updating %d/%d partitions.", - partitions.size(), partitionMap_.size())); - Set<String> partitionNames = Sets.newHashSet(); - for (HdfsPartition part: partitions) { - partitionNames.add(part.getPartitionName()); - } - loadPartitionsFromMetastore(partitionNames, client); - } - - /** - * Loads from the Hive Metastore the partitions that correspond to the specified - * 'partitionNames' and adds them to the internal list of table partitions. - */ - private void loadPartitionsFromMetastore(Set<String> partitionNames, - IMetaStoreClient client) throws Exception { - Preconditions.checkNotNull(partitionNames); - if (partitionNames.isEmpty()) return; - // Load partition metadata from Hive Metastore. - List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions = - Lists.newArrayList(); - msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client, - Lists.newArrayList(partitionNames), db_.getName(), name_)); - - Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap(); - for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { - HdfsPartition partition = - createPartition(msPartition.getSd(), msPartition, fileBlocksToLoad); - addPartition(partition); - // If the partition is null, its HDFS path does not exist, and it was not added to - // this table's partition list. Skip the partition. - if (partition == null) continue; - if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true; - if (msPartition.getParameters() != null) { - partition.setNumRows(getRowCount(msPartition.getParameters())); - } - if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) { - // TODO: READ_ONLY isn't exactly correct because the it's possible the - // partition does not have READ permissions either. When we start checking - // whether we can READ from a table, this should be updated to set the - // table's access level to the "lowest" effective level across all - // partitions. That is, if one partition has READ_ONLY and another has - // WRITE_ONLY the table's access level should be NONE. - accessLevel_ = TAccessLevel.READ_ONLY; - } - } - loadDiskIds(fileBlocksToLoad); - } - - /** - * Loads the file descriptors and block metadata of a list of partitions. - */ - private void loadPartitionFileMetadata(List<HdfsPartition> partitions) - throws Exception { - Preconditions.checkNotNull(partitions); - LOG.info(String.format("loading file metadata for %d partitions", - partitions.size())); - org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); - Preconditions.checkNotNull(msTbl); - HdfsStorageDescriptor fileFormatDescriptor = - HdfsStorageDescriptor.fromStorageDescriptor(this.name_, msTbl.getSd()); - Map<FsKey, FileBlocksInfo> perFsFileBlocks = Maps.newHashMap(); - for (HdfsPartition part: partitions) { - org.apache.hadoop.hive.metastore.api.Partition msPart = - part.toHmsPartition(); - StorageDescriptor sd = null; - if (msPart == null) { - // If this partition is not stored in the Hive Metastore (e.g. default partition - // of an unpartitioned table), use the table's storage descriptor to load file - // metadata. - sd = msTbl.getSd(); - } else { - sd = msPart.getSd(); - } - loadPartitionFileMetadata(sd, part, fileFormatDescriptor.getFileFormat(), - perFsFileBlocks); - } - loadDiskIds(perFsFileBlocks); - } - - /** - * Loads the file descriptors and block metadata of a partition from its - * StorageDescriptor. If 'partition' does not have an entry in the Hive Metastore, - * 'storageDescriptor' is the StorageDescriptor of the associated table. Populates - * 'perFsFileBlocks' with file block info and updates table metadata. - */ - private void loadPartitionFileMetadata(StorageDescriptor storageDescriptor, - HdfsPartition partition, HdfsFileFormat fileFormat, - Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws Exception { - Preconditions.checkNotNull(storageDescriptor); - Preconditions.checkNotNull(partition); - org.apache.hadoop.hive.metastore.api.Partition msPart = - partition.toHmsPartition(); - boolean isMarkedCached = isMarkedCached_; - if (msPart != null) { - isMarkedCached = HdfsCachingUtil.validateCacheParams(msPart.getParameters()); - } - Path partDirPath = new Path(storageDescriptor.getLocation()); - FileSystem fs = partDirPath.getFileSystem(CONF); - if (!fs.exists(partDirPath)) return; - - String partitionDir = partDirPath.toString(); - numHdfsFiles_ -= partition.getNumFileDescriptors(); - totalHdfsBytes_ -= partition.getSize(); - Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0); - updatePartitionFds(partDirPath, isMarkedCached, fileFormat, perFsFileBlocks); - List<FileDescriptor> fileDescs = Lists.newArrayList( - perPartitionFileDescMap_.get(partDirPath.toString()).values()); - partition.setFileDescriptors(fileDescs); - totalHdfsBytes_ += partition.getSize(); - numHdfsFiles_ += fileDescs.size(); - } - - /** - * Updates the file descriptors of a partition directory specified by 'partitionPath' - * and loads block metadata of new/modified files. Reuses existing FileDescriptors for - * unchanged files (indicated by unchanged mtime). The one exception is if the - * partition is marked as cached (HDFS caching) in which case the block metadata - * cannot be reused. Otherwise, creates new FileDescriptors and adds them to - * perPartitionFileDescMap_. 'fileFomat' is the file format of the files in this - * partition directory. 'perFsFileBlocks' is populated with the loaded block metadata. - */ - private void updatePartitionFds(Path partitionPath, - boolean isMarkedCached, HdfsFileFormat fileFormat, - Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws CatalogException { - Preconditions.checkNotNull(partitionPath); - String partPathStr = partitionPath.toString(); - try { - FileSystem fs = partitionPath.getFileSystem(CONF); - if (!fs.exists(partitionPath)) { - perPartitionFileDescMap_.put( - partPathStr, Maps.<String, FileDescriptor>newHashMap()); - return; - } - Map<String, FileDescriptor> fileDescMap = - perPartitionFileDescMap_.get(partPathStr); - Map<String, FileDescriptor> newFileDescMap = Maps.newHashMap(); - // Get all the files in the partition directory - for (FileStatus fileStatus: fs.listStatus(partitionPath)) { - String fileName = fileStatus.getPath().getName().toString(); - if (fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) || - HdfsCompression.fromFileName(fileName) == HdfsCompression.LZO_INDEX) { - // Ignore directory, hidden file starting with . or _, and LZO index files - // If a directory is erroneously created as a subdirectory of a partition dir - // we should ignore it and move on. Hive will not recurse into directories. - // Skip index files, these are read by the LZO scanner directly. - continue; - } - FileDescriptor fd = fileDescMap != null ? fileDescMap.get(fileName) : null; - if (fd == null || isMarkedCached || fd.getFileLength() != fileStatus.getLen() - || fd.getModificationTime() != fileStatus.getModificationTime()) { - // Metadata of cached or modified files are not reused. - fd = new FileDescriptor(fileName, fileStatus.getLen(), - fileStatus.getModificationTime()); - loadBlockMetadata(fs, fileStatus, fd, fileFormat, perFsFileBlocks); - } - newFileDescMap.put(fileName, fd); - } - perPartitionFileDescMap_.put(partPathStr, newFileDescMap); - } catch (Exception e) { - throw new CatalogException("Failed to retrieve file descriptors from path " + - partitionPath, e); - } - } - - @Override - protected List<String> getColumnNamesWithHmsStats() { - List<String> ret = Lists.newArrayList(); - // Only non-partition columns have column stats in the HMS. - for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) { - ret.add(column.getName().toLowerCase()); - } - return ret; - } - - @Override - protected synchronized void loadFromThrift(TTable thriftTable) - throws TableLoadingException { - super.loadFromThrift(thriftTable); - THdfsTable hdfsTable = thriftTable.getHdfs_table(); - Preconditions.checkState(hdfsTable.getPartition_prefixes() instanceof ArrayList<?>); - partitionLocationCompressor_ = new HdfsPartitionLocationCompressor( - numClusteringCols_, (ArrayList<String>)hdfsTable.getPartition_prefixes()); - hdfsBaseDir_ = hdfsTable.getHdfsBaseDir(); - nullColumnValue_ = hdfsTable.nullColumnValue; - nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue; - multipleFileSystems_ = hdfsTable.multiple_filesystems; - Preconditions.checkState(hdfsTable.getNetwork_addresses() instanceof ArrayList<?>); - hostIndex_.populate((ArrayList<TNetworkAddress>)hdfsTable.getNetwork_addresses()); - resetPartitions(); - - try { - for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) { - HdfsPartition hdfsPart = - HdfsPartition.fromThrift(this, part.getKey(), part.getValue()); - addPartition(hdfsPart); - } - } catch (CatalogException e) { - throw new TableLoadingException(e.getMessage()); - } - avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null; - isMarkedCached_ = - HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters()); - } - - @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { - // Create thrift descriptors to send to the BE. The BE does not - // need any information below the THdfsPartition level. - TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(), TTableType.HDFS_TABLE, - getTColumnDescriptors(), numClusteringCols_, name_, db_.getName()); - tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions)); - return tableDesc; - } - - @Override - public TTable toThrift() { - // Send all metadata between the catalog service and the FE. - TTable table = super.toThrift(); - table.setTable_type(TTableType.HDFS_TABLE); - table.setHdfs_table(getTHdfsTable(true, null)); - return table; - } - - /** - * Create a THdfsTable corresponding to this HdfsTable. If includeFileDesc is true, - * then then all partitions and THdfsFileDescs of each partition should be included. - * Otherwise, don't include any THdfsFileDescs, and include only those partitions in - * the refPartitions set (the backend doesn't need metadata for unreferenced - * partitions). To prevent the catalog from hitting an OOM error while trying to - * serialize large partition incremental stats, we estimate the stats size and filter - * the incremental stats data from partition objects if the estimate exceeds - * MAX_INCREMENTAL_STATS_SIZE_BYTES. - */ - private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) { - // includeFileDesc implies all partitions should be included (refPartitions == null). - Preconditions.checkState(!includeFileDesc || refPartitions == null); - int numPartitions = - (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size(); - long statsSizeEstimate = - numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES; - boolean includeIncrementalStats = - (statsSizeEstimate < MAX_INCREMENTAL_STATS_SIZE_BYTES); - Map<Long, THdfsPartition> idToPartition = Maps.newHashMap(); - for (HdfsPartition partition: partitionMap_.values()) { - long id = partition.getId(); - if (refPartitions == null || refPartitions.contains(id)) { - idToPartition.put(id, - partition.toThrift(includeFileDesc, includeIncrementalStats)); - } - } - THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(), - nullPartitionKeyValue_, nullColumnValue_, idToPartition); - hdfsTable.setAvroSchema(avroSchema_); - hdfsTable.setMultiple_filesystems(multipleFileSystems_); - if (includeFileDesc) { - // Network addresses are used only by THdfsFileBlocks which are inside - // THdfsFileDesc, so include network addreses only when including THdfsFileDesc. - hdfsTable.setNetwork_addresses(hostIndex_.getList()); - } - hdfsTable.setPartition_prefixes(partitionLocationCompressor_.getPrefixes()); - return hdfsTable; - } - - public long getNumHdfsFiles() { return numHdfsFiles_; } - public long getTotalHdfsBytes() { return totalHdfsBytes_; } - public String getHdfsBaseDir() { return hdfsBaseDir_; } - public boolean isAvroTable() { return avroSchema_ != null; } - - /** - * Get the index of hosts that store replicas of blocks of this table. - */ - public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; } - - /** - * Returns the file format that the majority of partitions are stored in. - */ - public HdfsFileFormat getMajorityFormat() { - Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap(); - for (HdfsPartition partition: partitionMap_.values()) { - HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat(); - Integer numPartitions = numPartitionsByFormat.get(format); - if (numPartitions == null) { - numPartitions = Integer.valueOf(1); - } else { - numPartitions = Integer.valueOf(numPartitions.intValue() + 1); - } - numPartitionsByFormat.put(format, numPartitions); - } - - int maxNumPartitions = Integer.MIN_VALUE; - HdfsFileFormat majorityFormat = null; - for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) { - if (entry.getValue().intValue() > maxNumPartitions) { - majorityFormat = entry.getKey(); - maxNumPartitions = entry.getValue().intValue(); - } - } - Preconditions.checkNotNull(majorityFormat); - return majorityFormat; - } - - /** - * Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in - * the Hive Metastore. An HDFS path is represented as a list of strings values, one per - * partition key column. - */ - public List<List<String>> getPathsWithoutPartitions() throws CatalogException { - List<List<LiteralExpr>> existingPartitions = new ArrayList<List<LiteralExpr>>(); - // Get the list of partition values of existing partitions in Hive Metastore. - for (HdfsPartition partition: partitionMap_.values()) { - if (partition.isDefaultPartition()) continue; - existingPartitions.add(partition.getPartitionValues()); - } - - List<String> partitionKeys = Lists.newArrayList(); - for (int i = 0; i < numClusteringCols_; ++i) { - partitionKeys.add(getColumns().get(i).getName()); - } - Path basePath = new Path(hdfsBaseDir_); - List<List<String>> partitionsNotInHms = new ArrayList<List<String>>(); - try { - getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions, - partitionsNotInHms); - } catch (Exception e) { - throw new CatalogException(String.format("Failed to recover partitions for %s " + - "with exception:%s.", getFullName(), e)); - } - return partitionsNotInHms; - } - - /** - * Returns all partitions which match the partition keys directory structure and pass - * type compatibility check. Also these partitions are not already part of the table. - */ - private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys, - List<List<LiteralExpr>> existingPartitions, - List<List<String>> partitionsNotInHms) throws IOException { - FileSystem fs = path.getFileSystem(CONF); - // Check whether the base directory exists. - if (!fs.exists(path)) return; - - List<String> partitionValues = Lists.newArrayList(); - List<LiteralExpr> partitionExprs = Lists.newArrayList(); - getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues, - partitionExprs, existingPartitions, partitionsNotInHms); - } - - /** - * Returns all partitions which match the partition keys directory structure and pass - * the type compatibility check. - * - * path e.g. c1=1/c2=2/c3=3 - * partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3") - * depth The start position in partitionKeys to match the path name. - * partitionValues The partition values used to create a partition. - * partitionExprs The list of LiteralExprs which is used to avoid duplicate partitions. - * E.g. Having /c1=0001 and /c1=01, we should make sure only one partition - * will be added. - * existingPartitions All partitions which exist in Hive Metastore or newly added. - * partitionsNotInHms Contains all the recovered partitions. - */ - private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys, - int depth, FileSystem fs, List<String> partitionValues, - List<LiteralExpr> partitionExprs, List<List<LiteralExpr>> existingPartitions, - List<List<String>> partitionsNotInHms) throws IOException { - if (depth == partitionKeys.size()) { - if (existingPartitions.contains(partitionExprs)) { - LOG.trace(String.format("Skip recovery of path '%s' because it already exists " + - "in metastore", path.toString())); - } else { - partitionsNotInHms.add(partitionValues); - existingPartitions.add(partitionExprs); - } - return; - } - - FileStatus[] statuses = fs.listStatus(path); - for (FileStatus status: statuses) { - if (!status.isDirectory()) continue; - Pair<String, LiteralExpr> keyValues = - getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth)); - if (keyValues == null) continue; - - List<String> currentPartitionValues = Lists.newArrayList(partitionValues); - List<LiteralExpr> currentPartitionExprs = Lists.newArrayList(partitionExprs); - currentPartitionValues.add(keyValues.first); - currentPartitionExprs.add(keyValues.second); - getAllPartitionsNotInHms(status.getPath(), partitionKeys, depth + 1, fs, - currentPartitionValues, currentPartitionExprs, - existingPartitions, partitionsNotInHms); - } - } - - /** - * Checks that the last component of 'path' is of the form "<partitionkey>=<v>" - * where 'v' is a type-compatible value from the domain of the 'partitionKey' column. - * If not, returns null, otherwise returns a Pair instance, the first element is the - * original value, the second element is the LiteralExpr created from the original - * value. - */ - private Pair<String, LiteralExpr> getTypeCompatibleValue(Path path, - String partitionKey) { - String partName[] = path.getName().split("="); - if (partName.length != 2 || !partName[0].equals(partitionKey)) return null; - - // Check Type compatibility for Partition value. - Column column = getColumn(partName[0]); - Preconditions.checkNotNull(column); - Type type = column.getType(); - LiteralExpr expr = null; - if (!partName[1].equals(getNullPartitionKeyValue())) { - try { - expr = LiteralExpr.create(partName[1], type); - // Skip large value which exceeds the MAX VALUE of specified Type. - if (expr instanceof NumericLiteral) { - if (NumericLiteral.isOverflow(((NumericLiteral)expr).getValue(), type)) { - LOG.warn(String.format("Skip the overflow value (%s) for Type (%s).", - partName[1], type.toSql())); - return null; - } - } - } catch (Exception ex) { - LOG.debug(String.format("Invalid partition value (%s) for Type (%s).", - partName[1], type.toSql())); - return null; - } - } else { - expr = new NullLiteral(); - } - return new Pair<String, LiteralExpr>(partName[1], expr); - } - - /** - * Returns statistics on this table as a tabular result set. Used for the - * SHOW TABLE STATS statement. The schema of the returned TResultSet is set - * inside this method. - */ - public TResultSet getTableStats() { - TResultSet result = new TResultSet(); - TResultSetMetadata resultSchema = new TResultSetMetadata(); - result.setSchema(resultSchema); - - for (int i = 0; i < numClusteringCols_; ++i) { - // Add the partition-key values as strings for simplicity. - Column partCol = getColumns().get(i); - TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift()); - resultSchema.addToColumns(colDesc); - } - - resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift())); - resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift())); - resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift())); - - // Pretty print partitions and their stats. - ArrayList<HdfsPartition> orderedPartitions = - Lists.newArrayList(partitionMap_.values()); - Collections.sort(orderedPartitions); - - long totalCachedBytes = 0L; - for (HdfsPartition p: orderedPartitions) { - // Ignore dummy default partition. - if (p.isDefaultPartition()) continue; - TResultRowBuilder rowBuilder = new TResultRowBuilder(); - - // Add the partition-key values (as strings for simplicity). - for (LiteralExpr expr: p.getPartitionValues()) { - rowBuilder.add(expr.getStringValue()); - } - - // Add number of rows, files, bytes, cache stats, and file format. - rowBuilder.add(p.getNumRows()).add(p.getFileDescriptors().size()) - .addBytes(p.getSize()); - if (!p.isMarkedCached()) { - // Helps to differentiate partitions that have 0B cached versus partitions - // that are not marked as cached. - rowBuilder.add("NOT CACHED"); - rowBuilder.add("NOT CACHED"); - } else { - // Calculate the number the number of bytes that are cached. - long cachedBytes = 0L; - for (FileDescriptor fd: p.getFileDescriptors()) { - for (THdfsFileBlock fb: fd.getFileBlocks()) { - if (fb.getIs_replica_cached().contains(true)) { - cachedBytes += fb.getLength(); - } - } - } - totalCachedBytes += cachedBytes; - rowBuilder.addBytes(cachedBytes); - - // Extract cache replication factor from the parameters of the table - // if the table is not partitioned or directly from the partition. - Short rep = HdfsCachingUtil.getCachedCacheReplication( - numClusteringCols_ == 0 ? - p.getTable().getMetaStoreTable().getParameters() : - p.getParameters()); - rowBuilder.add(rep.toString()); - } - rowBuilder.add(p.getInputFormatDescriptor().getFileFormat().toString()); - - rowBuilder.add(String.valueOf(p.hasIncrementalStats())); - rowBuilder.add(p.getLocation()); - result.addToRows(rowBuilder.get()); - } - - // For partitioned tables add a summary row at the bottom. - if (numClusteringCols_ > 0) { - TResultRowBuilder rowBuilder = new TResultRowBuilder(); - int numEmptyCells = numClusteringCols_ - 1; - rowBuilder.add("Total"); - for (int i = 0; i < numEmptyCells; ++i) { - rowBuilder.add(""); - } - - // Total num rows, files, and bytes (leave format empty). - rowBuilder.add(numRows_).add(numHdfsFiles_).addBytes(totalHdfsBytes_) - .addBytes(totalCachedBytes).add("").add("").add("").add(""); - result.addToRows(rowBuilder.get()); - } - return result; - } - - /** - * Returns files info for the given dbname/tableName and partition spec. - * Returns files info for all partitions, if partition spec is null, ordered - * by partition. - */ - public TResultSet getFiles(List<TPartitionKeyValue> partitionSpec) - throws CatalogException { - TResultSet result = new TResultSet(); - TResultSetMetadata resultSchema = new TResultSetMetadata(); - result.setSchema(resultSchema); - resultSchema.addToColumns(new TColumn("Path", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift())); - result.setRows(Lists.<TResultRow>newArrayList()); - - List<HdfsPartition> orderedPartitions = null; - if (partitionSpec == null) { - orderedPartitions = Lists.newArrayList(partitionMap_.values()); - Collections.sort(orderedPartitions); - } else { - // Get the HdfsPartition object for the given partition spec. - HdfsPartition partition = getPartitionFromThriftPartitionSpec(partitionSpec); - Preconditions.checkState(partition != null); - orderedPartitions = Lists.newArrayList(partition); - } - - for (HdfsPartition p: orderedPartitions) { - List<FileDescriptor> orderedFds = Lists.newArrayList(p.getFileDescriptors()); - Collections.sort(orderedFds); - for (FileDescriptor fd: orderedFds) { - TResultRowBuilder rowBuilder = new TResultRowBuilder(); - rowBuilder.add(p.getLocation() + "/" + fd.getFileName()); - rowBuilder.add(PrintUtils.printBytes(fd.getFileLength())); - rowBuilder.add(p.getPartitionName()); - result.addToRows(rowBuilder.get()); - } - } - return result; - } - - /** - * Constructs a partition name from a list of TPartitionKeyValue objects. - */ - public static String constructPartitionName(List<TPartitionKeyValue> partitionSpec) { - List<String> partitionCols = Lists.newArrayList(); - List<String> partitionVals = Lists.newArrayList(); - for (TPartitionKeyValue kv: partitionSpec) { - partitionCols.add(kv.getName()); - partitionVals.add(kv.getValue()); - } - return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols, - partitionVals); - } - - /** - * Reloads the metadata of partition 'oldPartition' by removing - * it from the table and reconstructing it from the HMS partition object - * 'hmsPartition'. If old partition is null then nothing is removed and - * and partition constructed from 'hmsPartition' is simply added. - */ - public void reloadPartition(HdfsPartition oldPartition, Partition hmsPartition) - throws CatalogException { - HdfsPartition refreshedPartition = createPartition( - hmsPartition.getSd(), hmsPartition); - Preconditions.checkArgument(oldPartition == null - || oldPartition.compareTo(refreshedPart
<TRUNCATED>
