Repository: hive Updated Branches: refs/heads/master bb113e553 -> c30771639
HIVE-12927 HBase metastore: sequences should be one per row, not all in one row (Alan Gates reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c3077163 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c3077163 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c3077163 Branch: refs/heads/master Commit: c307716392754169451954b77353a3aec3fc21ef Parents: bb113e5 Author: Alan Gates <[email protected]> Authored: Thu Feb 18 09:58:16 2016 -0800 Committer: Alan Gates <[email protected]> Committed: Thu Feb 18 09:58:16 2016 -0800 ---------------------------------------------------------------------- .../metastore/hbase/TestHBaseSchemaTool.java | 2 +- .../hive/metastore/hbase/HBaseReadWrite.java | 58 ++++++++++---------- 2 files changed, 30 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c3077163/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java index 79c9e08..e5833b8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java @@ -577,6 +577,6 @@ public class TestHBaseSchemaTool extends HBaseIntegrationTests { outStr = new ByteArrayOutputStream(); out = new PrintStream(outStr); tool.go(false, HBaseReadWrite.SEQUENCES_TABLE, null, "whatever", conf, out, err); - Assert.assertEquals("mk: 1" + lsep, outStr.toString()); + Assert.assertEquals("cv_PERMANENT_FUNCTION: 3\nmaster_key: 1" + lsep, outStr.toString()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c3077163/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 61257f0..7ed825f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -101,7 +100,6 @@ public class HBaseReadWrite implements MetadataStore { final static String TABLE_TABLE = "HBMS_TBLS"; final static String USER_TO_ROLE_TABLE = "HBMS_USER_TO_ROLE"; final static String FILE_METADATA_TABLE = "HBMS_FILE_METADATA"; - final static String CHANGE_VERSION_TABLE = "HBMS_CHANGE_VERSION"; final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); final static byte[] STATS_CF = "s".getBytes(HBaseUtils.ENCODING); final static String NO_CACHE_CONF = "no.use.cache"; @@ -111,7 +109,7 @@ public class HBaseReadWrite implements MetadataStore { public final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, SECURITY_TABLE, SEQUENCES_TABLE, - TABLE_TABLE, FILE_METADATA_TABLE, CHANGE_VERSION_TABLE }; + TABLE_TABLE, FILE_METADATA_TABLE }; public final static Map<String, List<byte[]>> columnFamilies = new HashMap<> (tableNames.length); static { @@ -128,20 +126,22 @@ public class HBaseReadWrite implements MetadataStore { columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); // Stats CF will contain PPD stats. columnFamilies.put(FILE_METADATA_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); - columnFamilies.put(CHANGE_VERSION_TABLE, Arrays.asList(CATALOG_CF)); } + final static byte[] MASTER_KEY_SEQUENCE = "master_key".getBytes(HBaseUtils.ENCODING); + // The change version functionality uses the sequences table, but we don't want to give the + // caller complete control over the sequence name as they might inadvertently clash with one of + // our sequence keys, so add a prefix to their topic name. + final static String CHANGE_VERSION_SEQUENCE_PREFIX = "cv_"; + final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING); private final static byte[] AGGR_STATS_STATS_COL = "s".getBytes(HBaseUtils.ENCODING); - final static byte[] MASTER_KEY_SEQUENCE = "mk".getBytes(HBaseUtils.ENCODING); private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING); private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING); private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); private final static byte[] DELEGATION_TOKEN_COL = "dt".getBytes(HBaseUtils.ENCODING); private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING); private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); - private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING); - private final static byte[] CV_COL = "cv".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; // False positives are very bad here because they cause us to invalidate entries we shouldn't. // Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase @@ -2146,23 +2146,16 @@ public class HBaseReadWrite implements MetadataStore { *********************************************************************************************/ public long getChangeVersion(String topic) throws IOException { - byte[] key = HBaseUtils.buildKey(topic); - byte[] result = read(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL); - return (result == null) ? -1 : Long.valueOf(new String(result, HBaseUtils.ENCODING)); + byte[] key = HBaseUtils.buildKey(CHANGE_VERSION_SEQUENCE_PREFIX + topic); + return peekAtSequence(key); } // TODO: The way this is called now is not ideal. It's all encapsulated and stuff, but, // before the txns (consistent HBase writes) are properly implemented, we should at least // put this in the same RPC with real updates. But there are no guarantees anyway, so... public void incrementChangeVersion(String topic) throws IOException { - byte[] key = HBaseUtils.buildKey(topic); - byte[] serialized = read(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL); - long val = 0; - if (serialized != null) { - val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); - } - store(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL, - new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING)); + byte[] key = HBaseUtils.buildKey(CHANGE_VERSION_SEQUENCE_PREFIX + topic); + getNextSequence(key); } /********************************************************************************************** @@ -2400,14 +2393,19 @@ public class HBaseReadWrite implements MetadataStore { * Sequence methods *********************************************************************************************/ + long peekAtSequence(byte[] sequence) throws IOException { + byte[] serialized = read(SEQUENCES_TABLE, sequence, CATALOG_CF, CATALOG_COL); + return serialized == null ? 0 : Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); + } + long getNextSequence(byte[] sequence) throws IOException { - byte[] serialized = read(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence); + byte[] serialized = read(SEQUENCES_TABLE, sequence, CATALOG_CF, CATALOG_COL); long val = 0; if (serialized != null) { val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); } byte[] incrSerialized = new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING); - store(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence, incrSerialized); + store(SEQUENCES_TABLE, sequence, CATALOG_CF, CATALOG_COL, incrSerialized); return val; } @@ -2418,16 +2416,18 @@ public class HBaseReadWrite implements MetadataStore { */ List<String> printSequences() throws IOException { HTableInterface htab = conn.getHBaseTable(SEQUENCES_TABLE); - Get g = new Get(SEQUENCES_KEY); - g.addFamily(CATALOG_CF); - Result result = htab.get(g); - if (result.isEmpty()) return Arrays.asList("No sequences"); - List<String> lines = new ArrayList<>(); - for (Map.Entry<byte[], byte[]> entry : result.getFamilyMap(CATALOG_CF).entrySet()) { - lines.add(new String(entry.getKey(), HBaseUtils.ENCODING) + ": " + - new String(entry.getValue(), HBaseUtils.ENCODING)); + Iterator<Result> iter = + scan(SEQUENCES_TABLE, CATALOG_CF, CATALOG_COL, null); + List<String> sequences = new ArrayList<>(); + if (!iter.hasNext()) return Arrays.asList("No sequences"); + while (iter.hasNext()) { + Result result = iter.next(); + sequences.add(new StringBuilder(new String(result.getRow(), HBaseUtils.ENCODING)) + .append(": ") + .append(new String(result.getValue(CATALOG_CF, CATALOG_COL), HBaseUtils.ENCODING)) + .toString()); } - return lines; + return sequences; } /**********************************************************************************************
