Repository: hive Updated Branches: refs/heads/master baf32e2e2 -> 01580af2e
http://git-wip-us.apache.org/repos/asf/hive/blob/01580af2/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index f4f30d7..f4df2e2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.metastore.hbase; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; - import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,10 +73,8 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; -import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -847,15 +844,67 @@ class HBaseUtils { } /** + * Deserialize a partition key when you know nothing about it. That is, you do not know what + * dbname, tablename it came from. + * @param key the key fetched from HBase + * @param callback A reference to the calling HBaseReadWrite object. This has to be done as a + * callback because we have to first deserialize the database name and table + * name, and then fetch the table information, and then we will know how to + * desierliaze the rest of the key. + * @return a list that includes the dbname, tablename, and partition values + * @throws IOException + */ + static List<String> deserializePartitionKey(byte[] key, HBaseReadWrite callback) + throws IOException { + List<String> keyParts = + desierliazeDbNameTableNameFromPartitionKey(key, callback.getConf()); + Table table = callback.getTable(keyParts.get(0), keyParts.get(1)); + keyParts.addAll(deserializePartitionKey(table.getPartitionKeys(), key, callback.getConf())); + return keyParts; + } + + /** * Deserialize a partition. This version should be used when the partition key is not already - * known (eg a scan). + * known and the database and table name are not known either (eg a full scan). Because the + * dbname and tablename (and thus the partition columns) are not known a priori this version + * has to go fetch the table after it figures out which table. If you already have the table + * object you should use + * {@link #deserializePartition(String,String,List,byte[],byte[],Configuration)} * @param key the key fetched from HBase * @param serialized the value fetched from HBase + * @param callback A reference to the calling HBaseReadWrite object. This has to be done as a + * callback because we have to first deserialize the database name and table + * name, and then fetch the table information, and then we will know how to + * desierliaze the rest of the key. * @return A struct that contains the partition plus parts of the storage descriptor */ - static StorageDescriptorParts deserializePartition(String dbName, String tableName, List<FieldSchema> partitions, - byte[] key, byte[] serialized, Configuration conf) throws InvalidProtocolBufferException { - List keys = deserializePartitionKey(partitions, key, conf); + static StorageDescriptorParts deserializePartition(byte[] key, byte[] serialized, + HBaseReadWrite callback) + throws IOException { + List<String> dbNameTableName = + desierliazeDbNameTableNameFromPartitionKey(key, callback.getConf()); + Table table = callback.getTable(dbNameTableName.get(0), dbNameTableName.get(1)); + List<String> keys = deserializePartitionKey(table.getPartitionKeys(), key, callback.getConf()); + return deserializePartition(dbNameTableName.get(0), dbNameTableName.get(1), keys, serialized); + } + + /** + * Deserialize a partition. This version should be used when you know the dbname and tablename + * but not the partition values. + * @param dbName database this partition is in + * @param tableName table this partition is in + * @param partitions schemas for the partition columns of this table + * @param key key fetched from HBase + * @param serialized serialized version of the partition + * @param conf configuration file + * @return + * @throws InvalidProtocolBufferException + */ + static StorageDescriptorParts deserializePartition(String dbName, String tableName, + List<FieldSchema> partitions, byte[] key, + byte[] serialized, Configuration conf) + throws InvalidProtocolBufferException { + List<String> keys = deserializePartitionKey(partitions, key, conf); return deserializePartition(dbName, tableName, keys, serialized); } @@ -887,12 +936,46 @@ class HBaseUtils { return sdParts; } - private static String[] deserializeKey(byte[] key) { + static String[] deserializeKey(byte[] key) { String k = new String(key, ENCODING); return k.split(KEY_SEPARATOR_STR); } - static List<String> deserializePartitionKey(List<FieldSchema> partitions, byte[] key, + private static List<String> desierliazeDbNameTableNameFromPartitionKey(byte[] key, + Configuration conf) { + StringBuffer names = new StringBuffer(); + names.append("dbName,tableName,"); + StringBuffer types = new StringBuffer(); + types.append("string,string,"); + BinarySortableSerDe serDe = new BinarySortableSerDe(); + Properties props = new Properties(); + props.setProperty(serdeConstants.LIST_COLUMNS, names.toString()); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString()); + try { + serDe.initialize(conf, props); + List deserializedkeys = ((List)serDe.deserialize(new BytesWritable(key))).subList(0, 2); + List<String> keys = new ArrayList<>(); + for (int i=0;i<deserializedkeys.size();i++) { + Object deserializedKey = deserializedkeys.get(i); + if (deserializedKey==null) { + throw new RuntimeException("Can't have a null dbname or tablename"); + } else { + TypeInfo inputType = TypeInfoUtils.getTypeInfoFromTypeString("string"); + ObjectInspector inputOI = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(inputType); + Converter converter = ObjectInspectorConverters.getConverter(inputOI, + PrimitiveObjectInspectorFactory.javaStringObjectInspector); + keys.add((String) converter.convert(deserializedKey)); + } + } + return keys; + } catch (SerDeException e) { + throw new RuntimeException("Error when deserialize key", e); + } + } + + // Deserialize a partition key and return _only_ the partition values. + private static List<String> deserializePartitionKey(List<FieldSchema> partitions, byte[] key, Configuration conf) { StringBuffer names = new StringBuffer(); names.append("dbName,tableName,"); @@ -911,7 +994,7 @@ class HBaseUtils { props.setProperty(serdeConstants.LIST_COLUMNS, names.toString()); props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString()); try { - serDe.initialize(new Configuration(), props); + serDe.initialize(conf, props); List deserializedkeys = ((List)serDe.deserialize(new BytesWritable(key))).subList(2, partitions.size()+2); List<String> partitionKeys = new ArrayList<String>(); for (int i=0;i<deserializedkeys.size();i++) {