http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java deleted file mode 100644 index 475a8d7..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java +++ /dev/null @@ -1,261 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; -import java.util.Map; - -import com.cloudera.impala.thrift.THdfsFileFormat; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -/** - * Supported HDFS file formats. Every file format specifies: - * 1) the input format class - * 2) the output format class - * 3) the serialization library class - * 4) whether scanning complex types from it is supported - * - * Important note: Always keep consistent with the classes used in Hive. - */ -public enum HdfsFileFormat { - RC_FILE("org.apache.hadoop.hive.ql.io.RCFileInputFormat", - "org.apache.hadoop.hive.ql.io.RCFileOutputFormat", - "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", - false, true), - TEXT("org.apache.hadoop.mapred.TextInputFormat", - "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", - false, false), - LZO_TEXT("com.hadoop.mapred.DeprecatedLzoTextInputFormat", - "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - "", false, false), - SEQUENCE_FILE("org.apache.hadoop.mapred.SequenceFileInputFormat", - "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", false, - true), - AVRO("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", - "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", - "org.apache.hadoop.hive.serde2.avro.AvroSerDe", - false, false), - PARQUET("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", - true, true); - - private final String inputFormat_; - private final String outputFormat_; - private final String serializationLib_; - - // Indicates whether we support scanning complex types for this file format. - private final boolean isComplexTypesSupported_; - - // Indicates whether the file format can skip complex columns in scans and just - // materialize scalar typed columns. Ignored if isComplexTypesSupported_ is true. - // TODO: Remove this once we support complex types for all file formats. - private final boolean canSkipColumnTypes_; - - HdfsFileFormat(String inputFormat, String outputFormat, String serializationLib, - boolean isComplexTypesSupported, boolean canSkipColumnTypes) { - inputFormat_ = inputFormat; - outputFormat_ = outputFormat; - serializationLib_ = serializationLib; - isComplexTypesSupported_ = isComplexTypesSupported; - canSkipColumnTypes_ = canSkipColumnTypes; - } - - public String inputFormat() { return inputFormat_; } - public String outputFormat() { return outputFormat_; } - public String serializationLib() { return serializationLib_; } - - // Impala supports legacy Parquet input formats and treats them internally as the most - // modern Parquet input format. - private static final String[] PARQUET_LEGACY_INPUT_FORMATS = { - "com.cloudera.impala.hive.serde.ParquetInputFormat", - "parquet.hive.DeprecatedParquetInputFormat", - "parquet.hive.MapredParquetInputFormat" - }; - - private static final Map<String, HdfsFileFormat> VALID_INPUT_FORMATS = - ImmutableMap.<String, HdfsFileFormat>builder() - .put(RC_FILE.inputFormat(), RC_FILE) - .put(TEXT.inputFormat(), TEXT) - .put(LZO_TEXT.inputFormat(), TEXT) - .put(SEQUENCE_FILE.inputFormat(), SEQUENCE_FILE) - .put(AVRO.inputFormat(), AVRO) - .put(PARQUET.inputFormat(), PARQUET) - .put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET) - .put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET) - .put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET) - .build(); - - /** - * Returns true if the string describes an input format class that we support. - */ - public static boolean isHdfsInputFormatClass(String inputFormatClass) { - return VALID_INPUT_FORMATS.containsKey(inputFormatClass); - } - - /** - * Returns the file format associated with the input format class, or null if - * the input format class is not supported. - */ - public static HdfsFileFormat fromHdfsInputFormatClass(String inputFormatClass) { - Preconditions.checkNotNull(inputFormatClass); - return VALID_INPUT_FORMATS.get(inputFormatClass); - } - - /** - * Returns the corresponding enum for a SerDe class name. If classname is not one - * of our supported formats, throws an IllegalArgumentException like Enum.valueOf - */ - public static HdfsFileFormat fromJavaClassName(String className) { - Preconditions.checkNotNull(className); - if (isHdfsInputFormatClass(className)) return VALID_INPUT_FORMATS.get(className); - throw new IllegalArgumentException(className); - } - - public static HdfsFileFormat fromThrift(THdfsFileFormat thriftFormat) { - switch (thriftFormat) { - case RC_FILE: return HdfsFileFormat.RC_FILE; - case TEXT: return HdfsFileFormat.TEXT; - case SEQUENCE_FILE: return HdfsFileFormat.SEQUENCE_FILE; - case AVRO: return HdfsFileFormat.AVRO; - case PARQUET: return HdfsFileFormat.PARQUET; - default: - throw new RuntimeException("Unknown THdfsFileFormat: " - + thriftFormat + " - should never happen!"); - } - } - - public THdfsFileFormat toThrift() { - switch (this) { - case RC_FILE: return THdfsFileFormat.RC_FILE; - case TEXT: return THdfsFileFormat.TEXT; - case SEQUENCE_FILE: return THdfsFileFormat.SEQUENCE_FILE; - case AVRO: return THdfsFileFormat.AVRO; - case PARQUET: return THdfsFileFormat.PARQUET; - default: - throw new RuntimeException("Unknown HdfsFormat: " - + this + " - should never happen!"); - } - } - - public String toSql(HdfsCompression compressionType) { - switch (this) { - case RC_FILE: return "RCFILE"; - case TEXT: - if (compressionType == HdfsCompression.LZO || - compressionType == HdfsCompression.LZO_INDEX) { - // TODO: Update this when we can write LZO text. - // It is not currently possible to create a table with LZO compressed text files - // in Impala, but this is valid in Hive. - return String.format("INPUTFORMAT '%s' OUTPUTFORMAT '%s'", - LZO_TEXT.inputFormat(), LZO_TEXT.outputFormat()); - } - return "TEXTFILE"; - case SEQUENCE_FILE: return "SEQUENCEFILE"; - case AVRO: return "AVRO"; - case PARQUET: return "PARQUET"; - default: - throw new RuntimeException("Unknown HdfsFormat: " - + this + " - should never happen!"); - } - } - - /* - * Checks whether a file is supported in Impala based on the file extension. - * Returns true if the file format is supported. If the file format is not - * supported, then it returns false and 'errorMsg' contains details on the - * incompatibility. - * - * Impala supports LZO, GZIP, SNAPPY and BZIP2 on text files for partitions that have - * been declared in the metastore as TEXT. LZO files can have their own input format. - * For now, raise an error on any other type. - */ - public boolean isFileCompressionTypeSupported(String fileName, - StringBuilder errorMsg) { - // Check to see if the file has a compression suffix. - // TODO: Add LZ4 - HdfsCompression compressionType = HdfsCompression.fromFileName(fileName); - switch (compressionType) { - case LZO: - case LZO_INDEX: - // Index files are read by the LZO scanner directly. - case GZIP: - case SNAPPY: - case BZIP2: - case NONE: - return true; - case DEFLATE: - // TODO: Ensure that text/deflate works correctly - if (this == TEXT) { - errorMsg.append("Expected compressed text file with {.lzo,.gzip,.snappy,.bz2} " - + "suffix: " + fileName); - return false; - } else { - return true; - } - default: - errorMsg.append("Unknown compression suffix: " + fileName); - return false; - } - } - - /** - * Returns true if this file format with the given compression format is splittable. - */ - public boolean isSplittable(HdfsCompression compression) { - switch (this) { - case TEXT: - return compression == HdfsCompression.NONE; - case RC_FILE: - case SEQUENCE_FILE: - case AVRO: - case PARQUET: - return true; - default: - throw new RuntimeException("Unknown HdfsFormat: " - + this + " - should never happen!"); - } - } - - /** - * Returns true if Impala supports scanning complex-typed columns - * from a table/partition with this file format. - */ - public boolean isComplexTypesSupported() { return isComplexTypesSupported_; } - - /** - * Returns true if this file format can skip complex typed columns and materialize - * only scalar typed columns. - */ - public boolean canSkipComplexTypes() { return canSkipColumnTypes_; } - - /** - * Returns a list with all formats for which isComplexTypesSupported() is true. - */ - public static List<HdfsFileFormat> complexTypesFormats() { - List<HdfsFileFormat> result = Lists.newArrayList(); - for (HdfsFileFormat f: values()) { - if (f.isComplexTypesSupported()) result.add(f); - } - return result; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java deleted file mode 100644 index f408468..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java +++ /dev/null @@ -1,791 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.LiteralExpr; -import com.cloudera.impala.analysis.NullLiteral; -import com.cloudera.impala.analysis.PartitionKeyValue; -import com.cloudera.impala.analysis.ToSqlUtils; -import com.cloudera.impala.common.FileSystemUtil; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.ImpalaInternalServiceConstants; -import com.cloudera.impala.thrift.TAccessLevel; -import com.cloudera.impala.thrift.TExpr; -import com.cloudera.impala.thrift.TExprNode; -import com.cloudera.impala.thrift.THdfsCompression; -import com.cloudera.impala.thrift.THdfsFileBlock; -import com.cloudera.impala.thrift.THdfsFileDesc; -import com.cloudera.impala.thrift.THdfsPartition; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TPartitionStats; -import com.cloudera.impala.thrift.TTableStats; -import com.cloudera.impala.util.HdfsCachingUtil; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Query-relevant information for one table partition. Partitions are comparable - * based on their partition-key values. The comparison orders partitions in ascending - * order with NULLs sorting last. The ordering is useful for displaying partitions - * in SHOW statements. - */ -public class HdfsPartition implements Comparable<HdfsPartition> { - /** - * Metadata for a single file in this partition. - * TODO: Do we even need this class? Just get rid of it and use the Thrift version? - */ - static public class FileDescriptor implements Comparable<FileDescriptor> { - private final THdfsFileDesc fileDescriptor_; - - public String getFileName() { return fileDescriptor_.getFile_name(); } - public long getFileLength() { return fileDescriptor_.getLength(); } - public THdfsCompression getFileCompression() { - return fileDescriptor_.getCompression(); - } - public long getModificationTime() { - return fileDescriptor_.getLast_modification_time(); - } - public List<THdfsFileBlock> getFileBlocks() { - return fileDescriptor_.getFile_blocks(); - } - - public THdfsFileDesc toThrift() { return fileDescriptor_; } - - public FileDescriptor(String fileName, long fileLength, long modificationTime) { - Preconditions.checkNotNull(fileName); - Preconditions.checkArgument(fileLength >= 0); - fileDescriptor_ = new THdfsFileDesc(); - fileDescriptor_.setFile_name(fileName); - fileDescriptor_.setLength(fileLength); - fileDescriptor_.setLast_modification_time(modificationTime); - fileDescriptor_.setCompression( - HdfsCompression.fromFileName(fileName).toThrift()); - List<THdfsFileBlock> emptyFileBlockList = Lists.newArrayList(); - fileDescriptor_.setFile_blocks(emptyFileBlockList); - } - - private FileDescriptor(THdfsFileDesc fileDesc) { - this(fileDesc.getFile_name(), fileDesc.length, fileDesc.last_modification_time); - for (THdfsFileBlock block: fileDesc.getFile_blocks()) { - fileDescriptor_.addToFile_blocks(block); - } - } - - public void addFileBlock(FileBlock blockMd) { - fileDescriptor_.addToFile_blocks(blockMd.toThrift()); - } - - public static FileDescriptor fromThrift(THdfsFileDesc desc) { - return new FileDescriptor(desc); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("FileName", getFileName()) - .add("Length", getFileLength()).toString(); - } - - /** - * Orders file descriptors lexicographically by file name. - */ - @Override - public int compareTo(FileDescriptor otherFd) { - return getFileName().compareTo(otherFd.getFileName()); - } - } - - /** - * Represents metadata of a single block replica. - */ - public static class BlockReplica { - private final boolean isCached_; - private final int hostIdx_; - - /** - * Creates a BlockReplica given a host ID/index and a flag specifying whether this - * replica is cahced. Host IDs are assigned when loading the block metadata in - * HdfsTable. - */ - public BlockReplica(int hostIdx, boolean isCached) { - hostIdx_ = hostIdx; - isCached_ = isCached; - } - - /** - * Parses the location (an ip address:port string) of the replica and returns a - * TNetworkAddress with this information, or null if parsing fails. - */ - public static TNetworkAddress parseLocation(String location) { - Preconditions.checkNotNull(location); - String[] ip_port = location.split(":"); - if (ip_port.length != 2) return null; - try { - return new TNetworkAddress(ip_port[0], Integer.parseInt(ip_port[1])); - } catch (NumberFormatException e) { - return null; - } - } - - public boolean isCached() { return isCached_; } - public int getHostIdx() { return hostIdx_; } - } - - /** - * File Block metadata - */ - public static class FileBlock { - private final THdfsFileBlock fileBlock_; - private boolean isCached_; // Set to true if there is at least one cached replica. - - private FileBlock(THdfsFileBlock fileBlock) { - fileBlock_ = fileBlock; - isCached_ = false; - for (boolean isCached: fileBlock.getIs_replica_cached()) { - isCached_ |= isCached; - } - } - - /** - * Construct a FileBlock given the start offset (in bytes) of the file associated - * with this block, the length of the block (in bytes), and a list of BlockReplicas. - * Does not fill diskIds. - */ - public FileBlock(long offset, long blockLength, - List<BlockReplica> replicaHostIdxs) { - Preconditions.checkNotNull(replicaHostIdxs); - fileBlock_ = new THdfsFileBlock(); - fileBlock_.setOffset(offset); - fileBlock_.setLength(blockLength); - - fileBlock_.setReplica_host_idxs(new ArrayList<Integer>(replicaHostIdxs.size())); - fileBlock_.setIs_replica_cached(new ArrayList<Boolean>(replicaHostIdxs.size())); - isCached_ = false; - for (BlockReplica replica: replicaHostIdxs) { - fileBlock_.addToReplica_host_idxs(replica.getHostIdx()); - fileBlock_.addToIs_replica_cached(replica.isCached()); - isCached_ |= replica.isCached(); - } - } - - public long getOffset() { return fileBlock_.getOffset(); } - public long getLength() { return fileBlock_.getLength(); } - // Returns true if at there at least one cached replica. - public boolean isCached() { return isCached_; } - public List<Integer> getReplicaHostIdxs() { - return fileBlock_.getReplica_host_idxs(); - } - - /** - * Populates the given THdfsFileBlock's list of disk ids with the given disk id - * values. The number of disk ids must match the number of network addresses - * set in the file block. - */ - public static void setDiskIds(int[] diskIds, THdfsFileBlock fileBlock) { - Preconditions.checkArgument( - diskIds.length == fileBlock.getReplica_host_idxs().size()); - fileBlock.setDisk_ids(Arrays.asList(ArrayUtils.toObject(diskIds))); - } - - /** - * Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if - * disk id is not supported. - */ - public int getDiskId(int hostIndex) { - if (fileBlock_.disk_ids == null) return -1; - return fileBlock_.getDisk_ids().get(hostIndex); - } - - public boolean isCached(int hostIndex) { - return fileBlock_.getIs_replica_cached().get(hostIndex); - } - - public THdfsFileBlock toThrift() { return fileBlock_; } - - public static FileBlock fromThrift(THdfsFileBlock thriftFileBlock) { - return new FileBlock(thriftFileBlock); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("offset", fileBlock_.offset) - .add("length", fileBlock_.length) - .add("#disks", fileBlock_.getDisk_idsSize()) - .toString(); - } - } - - private final HdfsTable table_; - private final List<LiteralExpr> partitionKeyValues_; - // estimated number of rows in partition; -1: unknown - private long numRows_ = -1; - private static AtomicLong partitionIdCounter_ = new AtomicLong(); - - // A unique ID for each partition, used to identify a partition in the thrift - // representation of a table. - private final long id_; - - /* - * Note: Although you can write multiple formats to a single partition (by changing - * the format before each write), Hive won't let you read that data and neither should - * we. We should therefore treat mixing formats inside one partition as user error. - * It's easy to add per-file metadata to FileDescriptor if this changes. - */ - private final HdfsStorageDescriptor fileFormatDescriptor_; - private List<FileDescriptor> fileDescriptors_; - private HdfsPartitionLocationCompressor.Location location_; - private final static Logger LOG = LoggerFactory.getLogger(HdfsPartition.class); - private boolean isDirty_ = false; - // True if this partition is marked as cached. Does not necessarily mean the data is - // cached. - private boolean isMarkedCached_ = false; - private final TAccessLevel accessLevel_; - - // (k,v) pairs of parameters for this partition, stored in the HMS. Used by Impala to - // store intermediate state for statistics computations. - private Map<String, String> hmsParameters_; - - public HdfsStorageDescriptor getInputFormatDescriptor() { - return fileFormatDescriptor_; - } - - public boolean isDefaultPartition() { - return id_ == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID; - } - - /** - * Returns true if the partition resides at a location which can be cached (e.g. HDFS). - */ - public boolean isCacheable() { - return FileSystemUtil.isPathCacheable(new Path(getLocation())); - } - - /** - * Return a partition name formed by concatenating partition keys and their values, - * compatible with the way Hive names partitions. Reuses Hive's - * org.apache.hadoop.hive.common.FileUtils.makePartName() function to build the name - * string because there are a number of special cases for how partition names are URL - * escaped. - * TODO: Consider storing the PartitionKeyValue in HdfsPartition. It would simplify - * this code would be useful in other places, such as fromThrift(). - */ - public String getPartitionName() { - List<String> partitionCols = Lists.newArrayList(); - for (int i = 0; i < getTable().getNumClusteringCols(); ++i) { - partitionCols.add(getTable().getColumns().get(i).getName()); - } - - return org.apache.hadoop.hive.common.FileUtils.makePartName( - partitionCols, getPartitionValuesAsStrings(true)); - } - - /** - * Returns a list of partition values as strings. If mapNullsToHiveKey is true, any NULL - * value is returned as the table's default null partition key string value, otherwise - * they are returned as 'NULL'. - */ - public List<String> getPartitionValuesAsStrings(boolean mapNullsToHiveKey) { - List<String> ret = Lists.newArrayList(); - for (LiteralExpr partValue: getPartitionValues()) { - if (mapNullsToHiveKey) { - ret.add(PartitionKeyValue.getPartitionKeyValueString( - partValue, getTable().getNullPartitionKeyValue())); - } else { - ret.add(partValue.getStringValue()); - } - } - return ret; - } - - /** - * Utility method which returns a string of conjuncts of equality exprs to exactly - * select this partition (e.g. ((month=2009) AND (year=2012)). - * TODO: Remove this when the TODO elsewhere in this file to save and expose the - * list of TPartitionKeyValues has been resolved. - */ - public String getConjunctSql() { - List<String> partColSql = Lists.newArrayList(); - for (Column partCol: getTable().getClusteringColumns()) { - partColSql.add(ToSqlUtils.getIdentSql(partCol.getName())); - } - - List<String> conjuncts = Lists.newArrayList(); - for (int i = 0; i < partColSql.size(); ++i) { - LiteralExpr partVal = getPartitionValues().get(i); - String partValSql = partVal.toSql(); - if (partVal instanceof NullLiteral || partValSql.isEmpty()) { - conjuncts.add(partColSql.get(i) + " IS NULL"); - } else { - conjuncts.add(partColSql.get(i) + "=" + partValSql); - } - } - return "(" + Joiner.on(" AND " ).join(conjuncts) + ")"; - } - - /** - * Returns a string of the form part_key1=value1/part_key2=value2... - */ - public String getValuesAsString() { - StringBuilder partDescription = new StringBuilder(); - for (int i = 0; i < getTable().getNumClusteringCols(); ++i) { - String columnName = getTable().getColumns().get(i).getName(); - String value = PartitionKeyValue.getPartitionKeyValueString( - getPartitionValues().get(i), - getTable().getNullPartitionKeyValue()); - partDescription.append(columnName + "=" + value); - if (i != getTable().getNumClusteringCols() - 1) partDescription.append("/"); - } - return partDescription.toString(); - } - - /** - * Returns the storage location (HDFS path) of this partition. Should only be called - * for partitioned tables. - */ - public String getLocation() { - return (location_ != null) ? location_.toString() : null; - } - public long getId() { return id_; } - public HdfsTable getTable() { return table_; } - public void setNumRows(long numRows) { numRows_ = numRows; } - public long getNumRows() { return numRows_; } - public boolean isMarkedCached() { return isMarkedCached_; } - void markCached() { isMarkedCached_ = true; } - - /** - * Updates the file format of this partition and sets the corresponding input/output - * format classes. - */ - public void setFileFormat(HdfsFileFormat fileFormat) { - fileFormatDescriptor_.setFileFormat(fileFormat); - cachedMsPartitionDescriptor_.sdInputFormat = fileFormat.inputFormat(); - cachedMsPartitionDescriptor_.sdOutputFormat = fileFormat.outputFormat(); - cachedMsPartitionDescriptor_.sdSerdeInfo.setSerializationLib( - fileFormatDescriptor_.getFileFormat().serializationLib()); - } - - public HdfsFileFormat getFileFormat() { - return fileFormatDescriptor_.getFileFormat(); - } - - public void setLocation(String place) { - location_ = table_.getPartitionLocationCompressor().new Location(place); - } - - public org.apache.hadoop.hive.metastore.api.SerDeInfo getSerdeInfo() { - return cachedMsPartitionDescriptor_.sdSerdeInfo; - } - - // May return null if no per-partition stats were recorded, or if the per-partition - // stats could not be deserialised from the parameter map. - public TPartitionStats getPartitionStats() { - try { - return PartitionStatsUtil.partStatsFromParameters(hmsParameters_); - } catch (ImpalaException e) { - LOG.warn("Could not deserialise incremental stats state for " + getPartitionName() + - ", consider DROP INCREMENTAL STATS ... PARTITION ... and recomputing " + - "incremental stats for this table."); - return null; - } - } - - public boolean hasIncrementalStats() { - TPartitionStats partStats = getPartitionStats(); - return partStats != null && partStats.intermediate_col_stats != null; - } - - /** - * Returns the HDFS permissions Impala has to this partition's directory - READ_ONLY, - * READ_WRITE, etc. - */ - public TAccessLevel getAccessLevel() { return accessLevel_; } - - /** - * Returns the HMS parameter with key 'key' if it exists, otherwise returns null. - */ - public String getParameter(String key) { - return hmsParameters_.get(key); - } - - public Map<String, String> getParameters() { return hmsParameters_; } - - public void putToParameters(String k, String v) { hmsParameters_.put(k, v); } - - /** - * Marks this partition's metadata as "dirty" indicating that changes have been - * made and this partition's metadata should not be reused during the next - * incremental metadata refresh. - */ - public void markDirty() { isDirty_ = true; } - public boolean isDirty() { return isDirty_; } - - /** - * Returns an immutable list of partition key expressions - */ - public List<LiteralExpr> getPartitionValues() { return partitionKeyValues_; } - public LiteralExpr getPartitionValue(int i) { return partitionKeyValues_.get(i); } - public List<HdfsPartition.FileDescriptor> getFileDescriptors() { - return fileDescriptors_; - } - public void setFileDescriptors(List<FileDescriptor> descriptors) { - fileDescriptors_ = descriptors; - } - public long getNumFileDescriptors() { - return fileDescriptors_ == null ? 0 : fileDescriptors_.size(); - } - - public boolean hasFileDescriptors() { return !fileDescriptors_.isEmpty(); } - - // Struct-style class for caching all the information we need to reconstruct an - // HMS-compatible Partition object, for use in RPCs to the metastore. We do this rather - // than cache the Thrift partition object itself as the latter can be large - thanks - // mostly to the inclusion of the full FieldSchema list. This class is read-only - if - // any field can be mutated by Impala it should belong to HdfsPartition itself (see - // HdfsPartition.location_ for an example). - // - // TODO: Cache this descriptor in HdfsTable so that identical descriptors are shared - // between HdfsPartition instances. - // TODO: sdInputFormat and sdOutputFormat can be mutated by Impala when the file format - // of a partition changes; move these fields to HdfsPartition. - private static class CachedHmsPartitionDescriptor { - public String sdInputFormat; - public String sdOutputFormat; - public final boolean sdCompressed; - public final int sdNumBuckets; - public final org.apache.hadoop.hive.metastore.api.SerDeInfo sdSerdeInfo; - public final List<String> sdBucketCols; - public final List<org.apache.hadoop.hive.metastore.api.Order> sdSortCols; - public final Map<String, String> sdParameters; - public final int msCreateTime; - public final int msLastAccessTime; - - public CachedHmsPartitionDescriptor( - org.apache.hadoop.hive.metastore.api.Partition msPartition) { - org.apache.hadoop.hive.metastore.api.StorageDescriptor sd = null; - if (msPartition != null) { - sd = msPartition.getSd(); - msCreateTime = msPartition.getCreateTime(); - msLastAccessTime = msPartition.getLastAccessTime(); - } else { - msCreateTime = msLastAccessTime = 0; - } - if (sd != null) { - sdInputFormat = sd.getInputFormat(); - sdOutputFormat = sd.getOutputFormat(); - sdCompressed = sd.isCompressed(); - sdNumBuckets = sd.getNumBuckets(); - sdSerdeInfo = sd.getSerdeInfo(); - sdBucketCols = ImmutableList.copyOf(sd.getBucketCols()); - sdSortCols = ImmutableList.copyOf(sd.getSortCols()); - sdParameters = ImmutableMap.copyOf(sd.getParameters()); - } else { - sdInputFormat = ""; - sdOutputFormat = ""; - sdCompressed = false; - sdNumBuckets = 0; - sdSerdeInfo = null; - sdBucketCols = ImmutableList.of(); - sdSortCols = ImmutableList.of(); - sdParameters = ImmutableMap.of(); - } - } - } - - private final CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_; - - public CachedHmsPartitionDescriptor getCachedMsPartitionDescriptor() { - return cachedMsPartitionDescriptor_; - } - - /** - * Returns a Hive-compatible partition object that may be used in calls to the - * metastore. - */ - public org.apache.hadoop.hive.metastore.api.Partition toHmsPartition() { - if (cachedMsPartitionDescriptor_ == null) return null; - Preconditions.checkNotNull(table_.getNonPartitionFieldSchemas()); - // Update the serde library class based on the currently used file format. - org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = - new org.apache.hadoop.hive.metastore.api.StorageDescriptor( - table_.getNonPartitionFieldSchemas(), - getLocation(), - cachedMsPartitionDescriptor_.sdInputFormat, - cachedMsPartitionDescriptor_.sdOutputFormat, - cachedMsPartitionDescriptor_.sdCompressed, - cachedMsPartitionDescriptor_.sdNumBuckets, - cachedMsPartitionDescriptor_.sdSerdeInfo, - cachedMsPartitionDescriptor_.sdBucketCols, - cachedMsPartitionDescriptor_.sdSortCols, - cachedMsPartitionDescriptor_.sdParameters); - org.apache.hadoop.hive.metastore.api.Partition partition = - new org.apache.hadoop.hive.metastore.api.Partition( - getPartitionValuesAsStrings(true), getTable().getDb().getName(), - getTable().getName(), cachedMsPartitionDescriptor_.msCreateTime, - cachedMsPartitionDescriptor_.msLastAccessTime, storageDescriptor, - getParameters()); - return partition; - } - - private HdfsPartition(HdfsTable table, - org.apache.hadoop.hive.metastore.api.Partition msPartition, - List<LiteralExpr> partitionKeyValues, - HdfsStorageDescriptor fileFormatDescriptor, - Collection<HdfsPartition.FileDescriptor> fileDescriptors, long id, - HdfsPartitionLocationCompressor.Location location, TAccessLevel accessLevel) { - table_ = table; - if (msPartition == null) { - cachedMsPartitionDescriptor_ = null; - } else { - cachedMsPartitionDescriptor_ = new CachedHmsPartitionDescriptor(msPartition); - } - location_ = location; - partitionKeyValues_ = ImmutableList.copyOf(partitionKeyValues); - fileDescriptors_ = ImmutableList.copyOf(fileDescriptors); - fileFormatDescriptor_ = fileFormatDescriptor; - id_ = id; - accessLevel_ = accessLevel; - if (msPartition != null && msPartition.getParameters() != null) { - isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId( - msPartition.getParameters()) != null; - hmsParameters_ = msPartition.getParameters(); - } else { - hmsParameters_ = Maps.newHashMap(); - } - - // TODO: instead of raising an exception, we should consider marking this partition - // invalid and moving on, so that table loading won't fail and user can query other - // partitions. - for (FileDescriptor fileDescriptor: fileDescriptors_) { - StringBuilder errorMsg = new StringBuilder(); - if (!getInputFormatDescriptor().getFileFormat().isFileCompressionTypeSupported( - fileDescriptor.getFileName(), errorMsg)) { - throw new RuntimeException(errorMsg.toString()); - } - } - } - - public HdfsPartition(HdfsTable table, - org.apache.hadoop.hive.metastore.api.Partition msPartition, - List<LiteralExpr> partitionKeyValues, - HdfsStorageDescriptor fileFormatDescriptor, - Collection<HdfsPartition.FileDescriptor> fileDescriptors, - TAccessLevel accessLevel) { - this(table, msPartition, partitionKeyValues, fileFormatDescriptor, fileDescriptors, - partitionIdCounter_.getAndIncrement(), - table.getPartitionLocationCompressor().new Location(msPartition != null - ? msPartition.getSd().getLocation() - : table.getLocation()), - accessLevel); - } - - public static HdfsPartition defaultPartition( - HdfsTable table, HdfsStorageDescriptor storageDescriptor) { - List<LiteralExpr> emptyExprList = Lists.newArrayList(); - List<FileDescriptor> emptyFileDescriptorList = Lists.newArrayList(); - return new HdfsPartition(table, null, emptyExprList, - storageDescriptor, emptyFileDescriptorList, - ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID, null, - TAccessLevel.READ_WRITE); - } - - /** - * Return the size (in bytes) of all the files inside this partition - */ - public long getSize() { - long result = 0; - for (HdfsPartition.FileDescriptor fileDescriptor: fileDescriptors_) { - result += fileDescriptor.getFileLength(); - } - return result; - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("fileDescriptors", fileDescriptors_) - .toString(); - } - - private static Predicate<String> isIncrementalStatsKey = new Predicate<String>() { - @Override - public boolean apply(String key) { - return !(key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_NUM_CHUNKS) - || key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_CHUNK_PREFIX)); - } - }; - - /** - * Returns hmsParameters_ after filtering out all the partition - * incremental stats information. - */ - private Map<String, String> getFilteredHmsParameters() { - return Maps.filterKeys(hmsParameters_, isIncrementalStatsKey); - } - - public static HdfsPartition fromThrift(HdfsTable table, - long id, THdfsPartition thriftPartition) { - HdfsStorageDescriptor storageDesc = new HdfsStorageDescriptor(table.getName(), - HdfsFileFormat.fromThrift(thriftPartition.getFileFormat()), - thriftPartition.lineDelim, - thriftPartition.fieldDelim, - thriftPartition.collectionDelim, - thriftPartition.mapKeyDelim, - thriftPartition.escapeChar, - (byte) '"', // TODO: We should probably add quoteChar to THdfsPartition. - thriftPartition.blockSize); - - List<LiteralExpr> literalExpr = Lists.newArrayList(); - if (id != ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) { - List<Column> clusterCols = Lists.newArrayList(); - for (int i = 0; i < table.getNumClusteringCols(); ++i) { - clusterCols.add(table.getColumns().get(i)); - } - - List<TExprNode> exprNodes = Lists.newArrayList(); - for (TExpr expr: thriftPartition.getPartitionKeyExprs()) { - for (TExprNode node: expr.getNodes()) { - exprNodes.add(node); - } - } - Preconditions.checkState(clusterCols.size() == exprNodes.size(), - String.format("Number of partition columns (%d) does not match number " + - "of partition key expressions (%d)", - clusterCols.size(), exprNodes.size())); - - for (int i = 0; i < exprNodes.size(); ++i) { - literalExpr.add(LiteralExpr.fromThrift( - exprNodes.get(i), clusterCols.get(i).getType())); - } - } - - List<HdfsPartition.FileDescriptor> fileDescriptors = Lists.newArrayList(); - if (thriftPartition.isSetFile_desc()) { - for (THdfsFileDesc desc: thriftPartition.getFile_desc()) { - fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc)); - } - } - - TAccessLevel accessLevel = thriftPartition.isSetAccess_level() ? - thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE; - HdfsPartitionLocationCompressor.Location location = thriftPartition.isSetLocation() - ? table.getPartitionLocationCompressor().new Location( - thriftPartition.getLocation()) - : null; - HdfsPartition partition = new HdfsPartition(table, null, literalExpr, storageDesc, - fileDescriptors, id, location, accessLevel); - if (thriftPartition.isSetStats()) { - partition.setNumRows(thriftPartition.getStats().getNum_rows()); - } - if (thriftPartition.isSetIs_marked_cached()) { - partition.isMarkedCached_ = thriftPartition.isIs_marked_cached(); - } - - if (thriftPartition.isSetHms_parameters()) { - partition.hmsParameters_ = thriftPartition.getHms_parameters(); - } else { - partition.hmsParameters_ = Maps.newHashMap(); - } - - return partition; - } - - /** - * Checks that this partition's metadata is well formed. This does not necessarily - * mean the partition is supported by Impala. - * Throws a CatalogException if there are any errors in the partition metadata. - */ - public void checkWellFormed() throws CatalogException { - try { - // Validate all the partition key/values to ensure you can convert them toThrift() - Expr.treesToThrift(getPartitionValues()); - } catch (Exception e) { - throw new CatalogException("Partition (" + getPartitionName() + - ") has invalid partition column values: ", e); - } - } - - public THdfsPartition toThrift(boolean includeFileDesc, - boolean includeIncrementalStats) { - List<TExpr> thriftExprs = Expr.treesToThrift(getPartitionValues()); - - THdfsPartition thriftHdfsPart = new THdfsPartition( - fileFormatDescriptor_.getLineDelim(), - fileFormatDescriptor_.getFieldDelim(), - fileFormatDescriptor_.getCollectionDelim(), - fileFormatDescriptor_.getMapKeyDelim(), - fileFormatDescriptor_.getEscapeChar(), - fileFormatDescriptor_.getFileFormat().toThrift(), thriftExprs, - fileFormatDescriptor_.getBlockSize()); - if (location_ != null) thriftHdfsPart.setLocation(location_.toThrift()); - thriftHdfsPart.setStats(new TTableStats(numRows_)); - thriftHdfsPart.setAccess_level(accessLevel_); - thriftHdfsPart.setIs_marked_cached(isMarkedCached_); - thriftHdfsPart.setId(getId()); - thriftHdfsPart.setHms_parameters( - includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters()); - if (includeFileDesc) { - // Add block location information - for (FileDescriptor fd: fileDescriptors_) { - thriftHdfsPart.addToFile_desc(fd.toThrift()); - } - } - - return thriftHdfsPart; - } - - /** - * Comparison method to allow ordering of HdfsPartitions by their partition-key values. - */ - @Override - public int compareTo(HdfsPartition o) { - return comparePartitionKeyValues(partitionKeyValues_, o.getPartitionValues()); - } - - @VisibleForTesting - public static int comparePartitionKeyValues(List<LiteralExpr> lhs, - List<LiteralExpr> rhs) { - int sizeDiff = lhs.size() - rhs.size(); - if (sizeDiff != 0) return sizeDiff; - for(int i = 0; i < lhs.size(); ++i) { - int cmp = lhs.get(i).compareTo(rhs.get(i)); - if (cmp != 0) return cmp; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java deleted file mode 100644 index b72b846..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java +++ /dev/null @@ -1,153 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import com.cloudera.impala.common.Pair; -import com.cloudera.impala.thrift.THdfsPartitionLocation; -import com.cloudera.impala.util.ListMap; -import com.google.common.base.Preconditions; - -/** - * Utility class for storing HdfsPartition locations in a comrpessed format. Each - * instance of this class is owned by a single HdfsTable instance. - * - * This class is not thread-safe by itself since it is only modified when the lock on an - * HdfsTable object is held. - * - * TODO: Generalize this to compress other sets of Strings that are likely to share common - * prefixes, like table locations. - * - */ -class HdfsPartitionLocationCompressor { - int numClusteringColumns_; - - // A bi-directional map between partition location prefixes and their compressed - // representation, an int. - final private ListMap<String> prefixMap_ = new ListMap<String>(); - - public HdfsPartitionLocationCompressor(int numClusteringColumns) { - numClusteringColumns_ = numClusteringColumns; - } - - // Construct an HdfsPartitionLocationCompressor with a pre-filled bidirectional map - // (indexToPrefix_, prefixToIndex_). - public HdfsPartitionLocationCompressor( - int numClusteringColumns, ArrayList<String> prefixes) { - numClusteringColumns_ = numClusteringColumns; - prefixMap_.populate(prefixes); - } - - public void setClusteringColumns(int numClusteringColumns) { - numClusteringColumns_ = numClusteringColumns; - } - - public List<String> getPrefixes() { - return prefixMap_.getList(); - } - - // One direction of the map: returns the prefix associated with an index, or "" is the - // index is -1. Indexes less than -1 or greater than indexToPrefix_.size()-1 are invalid - // and casue and IllegalArgumentException to be thrown. - private String indexToPrefix(int i) { - // Uncompressed location are represented by -1: - if (i == -1) return ""; - Preconditions.checkElementIndex(i, prefixMap_.size()); - return prefixMap_.getEntry(i); - } - - // Compress a location prefix, adding it to the bidirectional map (indexToPrefix_, - // prefixToIndex_) if it is not already present. - private int prefixToIndex(String s) { - return prefixMap_.getIndex(s); - } - - // A surrogate for THdfsPartitionLocation, which represents a partition's location - // relative to its parent table's list of partition prefixes. - public class Location { - // 'prefix_index_' represents the portion of the partition's location that comes before - // the last N directories, where N is the number of partitioning columns. - // 'prefix_index_' is an index into - // HdfsPartitionLocationCompressor.this.indexToPrefix_. 'suffix_' is the rest of the - // partition location. - // - // TODO: Since each partition stores the literal values for the partitioning columns, - // we could also elide the column names and values from suffix_ when a partition is in - // the canonical location "/partitioning_column_name_1=value_1/..." - private final int prefix_index_; - private final String suffix_; - - public Location(String location) { - Preconditions.checkNotNull(location); - Pair<String,String> locationParts = decompose(location); - prefix_index_ = - HdfsPartitionLocationCompressor.this.prefixToIndex(locationParts.first); - suffix_ = locationParts.second; - } - - public Location(THdfsPartitionLocation thrift) { - Preconditions.checkNotNull(thrift); - prefix_index_ = thrift.prefix_index; - suffix_ = thrift.getSuffix(); - } - - public THdfsPartitionLocation toThrift() { - return new THdfsPartitionLocation(prefix_index_, suffix_); - } - - @Override - public String toString() { - return HdfsPartitionLocationCompressor.this.indexToPrefix(prefix_index_) + suffix_; - } - - @Override - public int hashCode() { return toString().hashCode(); } - - @Override - public boolean equals(Object obj) { - return (obj instanceof Location) && (toString() == obj.toString()); - } - - // Decompose a location string by removing its last N directories, where N is the - // number of clustering columns. The result is a Pair<String,String> where the first - // String is the prefix and the second is the suffix. (In orther words, their - // concatenation equals the input.) If the input does not have at least N '/' - // characters, the prefix is empty and the suffix is the entire input. - private Pair<String,String> decompose(String s) { - Preconditions.checkNotNull(s); - int numClusteringColumns = - HdfsPartitionLocationCompressor.this.numClusteringColumns_; - if (numClusteringColumns == 0) return new Pair<String,String>(s, ""); - // Iterate backwards over the input until we have passed 'numClusteringColumns' - // directories. What is left is the prefix. - int i = s.length() - 1; - // If the string ends in '/', iterating past it does not pass a clustering column. - if (i >= 0 && s.charAt(i) == '/') --i; - for (; numClusteringColumns > 0 && i >= 0; --i) { - if (s.charAt(i) == '/') --numClusteringColumns; - } - // If we successfully removed all the partition directories, s.charAt(i+1) is '/' - // and we can include it in the prefix. - if (0 == numClusteringColumns) ++i; - return new Pair<String,String>(s.substring(0, i + 1), s.substring(i + 1)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java deleted file mode 100644 index f018ce3..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java +++ /dev/null @@ -1,240 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; - -/** - * Represents the file format metadata for files stored in a table or partition. - */ -public class HdfsStorageDescriptor { - public static final char DEFAULT_LINE_DELIM = '\n'; - // hive by default uses ctrl-a as field delim - public static final char DEFAULT_FIELD_DELIM = '\u0001'; - // hive by default has no escape char - public static final char DEFAULT_ESCAPE_CHAR = '\u0000'; - - // Serde parameters that are recognized by table writers. - private static final String BLOCK_SIZE = "blocksize"; - private static final String COMPRESSION = "compression"; - - // Important: don't change the ordering of these keys - if e.g. FIELD_DELIM is not - // found, the value of LINE_DELIM is used, so LINE_DELIM must be found first. - // Package visible for testing. - final static List<String> DELIMITER_KEYS = ImmutableList.of( - serdeConstants.LINE_DELIM, serdeConstants.FIELD_DELIM, - serdeConstants.COLLECTION_DELIM, serdeConstants.MAPKEY_DELIM, - serdeConstants.ESCAPE_CHAR, serdeConstants.QUOTE_CHAR); - - // The Parquet serde shows up multiple times as the location of the implementation - // has changed between Impala versions. - final static List<String> COMPATIBLE_SERDES = ImmutableList.of( - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", // (seq / text / parquet) - "org.apache.hadoop.hive.serde2.avro.AvroSerDe", // (avro) - "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", // (rc) - "parquet.hive.serde.ParquetHiveSerDe", // (parquet - legacy) - // TODO: Verify the following Parquet SerDe works with Impala and add - // support for the new input/output format classes. See CDH-17085. - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); // (parquet) - - private final static Logger LOG = LoggerFactory.getLogger(HdfsStorageDescriptor.class); - - private HdfsFileFormat fileFormat_; - private final byte lineDelim_; - private final byte fieldDelim_; - private final byte collectionDelim_; - private final byte mapKeyDelim_; - private final byte escapeChar_; - private final byte quoteChar_; - private final int blockSize_; - - public void setFileFormat(HdfsFileFormat fileFormat) { - fileFormat_ = fileFormat; - } - - /** - * Returns a map from delimiter key to a single delimiter character, - * filling in defaults if explicit values are not found in the supplied - * serde descriptor. - * - * @throws InvalidStorageDescriptorException - if an invalid delimiter is found - */ - private static Map<String, Byte> extractDelimiters(SerDeInfo serdeInfo) - throws InvalidStorageDescriptorException { - // The metastore may return null for delimiter parameters, - // which means we need to use a default instead. - // We tried long and hard to find default values for delimiters in Hive, - // but could not find them. - Map<String, Byte> delimMap = Maps.newHashMap(); - - for (String delimKey: DELIMITER_KEYS) { - String delimValue = serdeInfo.getParameters().get(delimKey); - if (delimValue == null) { - if (delimKey.equals(serdeConstants.FIELD_DELIM)) { - delimMap.put(delimKey, (byte) DEFAULT_FIELD_DELIM); - } else if (delimKey.equals(serdeConstants.ESCAPE_CHAR)) { - delimMap.put(delimKey, (byte) DEFAULT_ESCAPE_CHAR); - } else if (delimKey.equals(serdeConstants.LINE_DELIM)) { - delimMap.put(delimKey, (byte) DEFAULT_LINE_DELIM); - } else { - delimMap.put(delimKey, delimMap.get(serdeConstants.FIELD_DELIM)); - } - } else { - Byte delimByteValue = parseDelim(delimValue); - if (delimByteValue == null) { - throw new InvalidStorageDescriptorException("Invalid delimiter: '" + - delimValue + "'. Delimiter must be specified as a single character or " + - "as a decimal value in the range [-128:127]"); - } - delimMap.put(delimKey, parseDelim(delimValue)); - } - } - return delimMap; - } - - /** - * Parses a delimiter in a similar way as Hive, with some additional error checking. - * A delimiter must fit in a single byte and can be specified in the following - * formats, as far as I can tell (there isn't documentation): - * - A single ASCII or unicode character (ex. '|') - * - An escape character in octal format (ex. \001. Stored in the metastore as a - * unicode character: \u0001). - * - A signed decimal integer in the range [-128:127]. Used to support delimiters - * for ASCII character values between 128-255 (-2 maps to ASCII 254). - * - * The delimiter is first parsed as a decimal number. If the parsing succeeds AND - * the resulting value fits in a signed byte, the byte value of the parsed int is - * returned. Otherwise, if the string has a single char, the byte value of this - * char is returned. - * If the delimiter is invalid, null will be returned. - */ - public static Byte parseDelim(String delimVal) { - Preconditions.checkNotNull(delimVal); - try { - // In the future we could support delimiters specified in hex format, but we would - // need support from the Hive side. - return Byte.parseByte(delimVal); - } catch (NumberFormatException e) { - if (delimVal.length() == 1) return (byte) delimVal.charAt(0); - } - return null; - } - - public HdfsStorageDescriptor(String tblName, HdfsFileFormat fileFormat, byte lineDelim, - byte fieldDelim, byte collectionDelim, byte mapKeyDelim, byte escapeChar, - byte quoteChar, int blockSize) { - this.fileFormat_ = fileFormat; - this.lineDelim_ = lineDelim; - this.fieldDelim_ = fieldDelim; - this.collectionDelim_ = collectionDelim; - this.mapKeyDelim_ = mapKeyDelim; - this.quoteChar_ = quoteChar; - this.blockSize_ = blockSize; - - // You can set the escape character as a tuple or row delim. Empirically, - // this is ignored by hive. - if (escapeChar == fieldDelim || - escapeChar == lineDelim || - escapeChar == collectionDelim) { - // TODO: we should output the table name here but it's hard to get to now. - this.escapeChar_ = DEFAULT_ESCAPE_CHAR; - LOG.warn("Escape character for table, " + tblName + " is set to " - + "the same character as one of the delimiters. Ignoring escape character."); - } else { - this.escapeChar_ = escapeChar; - } - } - - /** - * Thrown when constructing an HdfsStorageDescriptor from an invalid/unsupported - * metastore storage descriptor. - * TODO: Get rid of this class. - */ - public static class InvalidStorageDescriptorException extends CatalogException { - // Mandatory since Exception implements Serialisable - private static final long serialVersionUID = -555234913768134760L; - public InvalidStorageDescriptorException(String s) { super(s); } - public InvalidStorageDescriptorException(Exception ex) { - super(ex.getMessage(), ex); - } - public InvalidStorageDescriptorException(String msg, Throwable cause) { - super(msg, cause); - } - } - - /** - * Constructs a new HdfsStorageDescriptor from a StorageDescriptor retrieved from the - * metastore. - * - * @throws InvalidStorageDescriptorException - if the storage descriptor has invalid - * delimiters, an unsupported SerDe, or an unknown file format. - */ - public static HdfsStorageDescriptor fromStorageDescriptor(String tblName, - StorageDescriptor sd) - throws InvalidStorageDescriptorException { - Map<String, Byte> delimMap = extractDelimiters(sd.getSerdeInfo()); - if (!COMPATIBLE_SERDES.contains(sd.getSerdeInfo().getSerializationLib())) { - throw new InvalidStorageDescriptorException(String.format("Impala does not " + - "support tables of this type. REASON: SerDe library '%s' is not " + - "supported.", sd.getSerdeInfo().getSerializationLib())); - } - // Extract the blocksize and compression specification from the SerDe parameters, - // if present. - Map<String, String> parameters = sd.getSerdeInfo().getParameters(); - int blockSize = 0; - String blockValue = parameters.get(BLOCK_SIZE); - if (blockValue != null) { - blockSize = Integer.parseInt(blockValue); - } - - try { - return new HdfsStorageDescriptor(tblName, - HdfsFileFormat.fromJavaClassName(sd.getInputFormat()), - delimMap.get(serdeConstants.LINE_DELIM), - delimMap.get(serdeConstants.FIELD_DELIM), - delimMap.get(serdeConstants.COLLECTION_DELIM), - delimMap.get(serdeConstants.MAPKEY_DELIM), - delimMap.get(serdeConstants.ESCAPE_CHAR), - delimMap.get(serdeConstants.QUOTE_CHAR), - blockSize); - } catch (IllegalArgumentException ex) { - // Thrown by fromJavaClassName - throw new InvalidStorageDescriptorException(ex); - } - } - - public byte getLineDelim() { return lineDelim_; } - public byte getFieldDelim() { return fieldDelim_; } - public byte getCollectionDelim() { return collectionDelim_; } - public byte getMapKeyDelim() { return mapKeyDelim_; } - public byte getEscapeChar() { return escapeChar_; } - public byte getQuoteChar() { return quoteChar_; } - public HdfsFileFormat getFileFormat() { return fileFormat_; } - public int getBlockSize() { return blockSize_; } -}
