Repository: hive
Updated Branches:
  refs/heads/master 514ab795f -> 523830338


http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
new file mode 100644
index 0000000..89c3e7b
--- /dev/null
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hive.common.util.BloomFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Filter for scanning aggregates stats table
+ */
+public class AggrStatsInvalidatorFilter extends FilterBase {
+  private static final Log LOG =
+      LogFactory.getLog(AggrStatsInvalidatorFilter.class.getName());
+  private final List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> 
entries;
+  private final long runEvery;
+  private final long maxCacheEntryLife;
+  // This class is not serializable, so I realize transient doesn't mean 
anything.  It's just to
+  // comunicate that we don't serialize this and ship it across to the filter 
on the other end.
+  // We use the time the filter is actually instantiated in HBase.
+  private transient long now;
+
+  public static Filter parseFrom(byte[] serialized) throws 
DeserializationException {
+    try {
+      return new AggrStatsInvalidatorFilter(
+          
HbaseMetastoreProto.AggrStatsInvalidatorFilter.parseFrom(serialized));
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * @param proto Protocol buffer representation of this filter.
+   */
+  AggrStatsInvalidatorFilter(HbaseMetastoreProto.AggrStatsInvalidatorFilter 
proto) {
+    this.entries = proto.getToInvalidateList();
+    this.runEvery = proto.getRunEvery();
+    this.maxCacheEntryLife = proto.getMaxCacheEntryLife();
+    now = System.currentTimeMillis();
+  }
+
+  @Override
+  public byte[] toByteArray() throws IOException {
+    return HbaseMetastoreProto.AggrStatsInvalidatorFilter.newBuilder()
+        .addAllToInvalidate(entries)
+        .setRunEvery(runEvery)
+        .setMaxCacheEntryLife(maxCacheEntryLife)
+        .build()
+        .toByteArray();
+  }
+
+  @Override
+  public boolean filterAllRemaining() throws IOException {
+    return false;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) throws IOException {
+    // Is this the partition we want?
+    if (Arrays.equals(CellUtil.cloneQualifier(cell), 
HBaseReadWrite.AGGR_STATS_BLOOM_COL)) {
+      HbaseMetastoreProto.AggrStatsBloomFilter fromCol =
+          
HbaseMetastoreProto.AggrStatsBloomFilter.parseFrom(CellUtil.cloneValue(cell));
+      BloomFilter bloom = null;
+      if (now - maxCacheEntryLife > fromCol.getAggregatedAt()) {
+        // It's too old, kill it regardless of whether we were asked to or not.
+        return ReturnCode.INCLUDE;
+      } else if (now - runEvery * 2 <= fromCol.getAggregatedAt()) {
+        // It's too new.  We might be stomping on something that was just 
created.  Skip it.
+        return ReturnCode.NEXT_ROW;
+      } else {
+        // Look through each of our entries and see if any of them match.
+        for (HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry entry : 
entries) {
+          // First check if we match on db and table match
+          if (entry.getDbName().equals(fromCol.getDbName()) &&
+              entry.getTableName().equals(fromCol.getTableName())) {
+            if (bloom == null) {
+              // Now, reconstitute the bloom filter and probe it with each of 
our partition names
+              bloom = new BloomFilter(
+                  fromCol.getBloomFilter().getBitsList(),
+                  fromCol.getBloomFilter().getNumBits(),
+                  fromCol.getBloomFilter().getNumFuncs());
+            }
+            if (bloom.test(entry.getPartName().toByteArray())) {
+              // This is most likely a match, so mark it and quit looking.
+              return ReturnCode.INCLUDE;
+            }
+          }
+        }
+      }
+      return ReturnCode.NEXT_ROW;
+    } else {
+      return ReturnCode.NEXT_COL;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
index 6171fab..2359939 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hive.metastore.hbase;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A simple metric to count how many times something occurs.
  */
@@ -44,4 +46,8 @@ class Counter {
     return bldr.toString();
   }
 
+  @VisibleForTesting long getCnt() {
+    return cnt;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
index fd6f9f5..332e30a 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
@@ -37,9 +37,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hive.common.util.BloomFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.AggregateStatsCache;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
@@ -53,8 +51,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregator;
-import 
org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregatorFactory;
+import org.apache.hive.common.util.BloomFilter;
 
 import java.io.IOException;
 import java.security.MessageDigest;
@@ -62,6 +59,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -75,6 +73,7 @@ import java.util.Set;
  */
 class HBaseReadWrite {
 
+  @VisibleForTesting final static String AGGR_STATS_TABLE = "HBMS_AGGR_STATS";
   @VisibleForTesting final static String DB_TABLE = "HBMS_DBS";
   @VisibleForTesting final static String FUNC_TABLE = "HBMS_FUNCS";
   @VisibleForTesting final static String GLOBAL_PRIVS_TABLE = 
"HBMS_GLOBAL_PRIVS";
@@ -89,12 +88,14 @@ class HBaseReadWrite {
   /**
    * List of tables in HBase
    */
-  final static String[] tableNames = { DB_TABLE, FUNC_TABLE, 
GLOBAL_PRIVS_TABLE, PART_TABLE,
-                                       USER_TO_ROLE_TABLE, ROLE_TABLE, 
SD_TABLE, TABLE_TABLE  };
+  final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, 
GLOBAL_PRIVS_TABLE,
+                                       PART_TABLE, USER_TO_ROLE_TABLE, 
ROLE_TABLE, SD_TABLE,
+                                       TABLE_TABLE  };
   final static Map<String, List<byte[]>> columnFamilies =
       new HashMap<String, List<byte[]>> (tableNames.length);
 
   static {
+    columnFamilies.put(AGGR_STATS_TABLE, Arrays.asList(CATALOG_CF));
     columnFamilies.put(DB_TABLE, Arrays.asList(CATALOG_CF));
     columnFamilies.put(FUNC_TABLE, Arrays.asList(CATALOG_CF));
     columnFamilies.put(GLOBAL_PRIVS_TABLE, Arrays.asList(CATALOG_CF));
@@ -105,11 +106,21 @@ class HBaseReadWrite {
     columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF));
   }
 
-  private final static byte[] CATALOG_COL = 
"cat".getBytes(HBaseUtils.ENCODING);
+  /**
+   * Stores the bloom filter for the aggregated stats, to determine what 
partitions are in this
+   * aggregate.
+   */
+  final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING);
+  private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING);
   private final static byte[] ROLES_COL = 
"roles".getBytes(HBaseUtils.ENCODING);
   private final static byte[] REF_COUNT_COL = 
"ref".getBytes(HBaseUtils.ENCODING);
-  private final static byte[] GLOBAL_PRIVS_KEY = 
"globalprivs".getBytes(HBaseUtils.ENCODING);
+  private final static byte[] AGGR_STATS_STATS_COL = 
"s".getBytes(HBaseUtils.ENCODING);
+  private final static byte[] GLOBAL_PRIVS_KEY = 
"gp".getBytes(HBaseUtils.ENCODING);
   private final static int TABLES_TO_CACHE = 10;
+  // False positives are very bad here because they cause us to invalidate 
entries we shouldn't.
+  // Space used and # of hash functions grows in proportion to ln of num bits 
so a 10x increase
+  // in accuracy doubles the required space and number of hash functions.
+  private final static double STATS_BF_ERROR_RATE = 0.001;
 
   @VisibleForTesting final static String TEST_CONN = "test_connection";
   private static HBaseConnection testConn;
@@ -135,7 +146,7 @@ class HBaseReadWrite {
   private ObjectCache<ObjectPair<String, String>, Table> tableCache;
   private ObjectCache<ByteArrayWrapper, StorageDescriptor> sdCache;
   private PartitionCache partCache;
-  private AggregateStatsCache aggrStatsCache;
+  private StatsCache statsCache;
   private Counter tableHits;
   private Counter tableMisses;
   private Counter tableOverflows;
@@ -239,8 +250,8 @@ class HBaseReadWrite {
       sdCache = new ObjectCache<ByteArrayWrapper, 
StorageDescriptor>(sdsCacheSize, sdHits,
           sdMisses, sdOverflows);
       partCache = new PartitionCache(totalCatalogObjectsToCache, partHits, 
partMisses, partOverflows);
-      aggrStatsCache = AggregateStatsCache.getInstance(conf);
     }
+    statsCache = StatsCache.getInstance(conf);
     roleCache = new HashMap<String, HbaseMetastoreProto.RoleGrantInfoList>();
     entireRoleTableInCache = false;
   }
@@ -252,14 +263,6 @@ class HBaseReadWrite {
         if (self.get().conn.getHBaseTable(name, true) == null) {
           List<byte[]> families = columnFamilies.get(name);
           self.get().conn.createHBaseTable(name, families);
-          /*
-          List<byte[]> columnFamilies = new ArrayList<byte[]>();
-          columnFamilies.add(CATALOG_CF);
-          if (TABLE_TABLE.equals(name) || PART_TABLE.equals(name)) {
-            columnFamilies.add(STATS_CF);
-          }
-          self.get().conn.createHBaseTable(name, columnFamilies);
-          */
         }
       }
       tablesCreated = true;
@@ -1465,13 +1468,12 @@ class HBaseReadWrite {
    *
    * @param dbName database the table is in
    * @param tableName table to update statistics for
-   * @param partName name of the partition, can be null if these are table 
level statistics.
    * @param partVals partition values that define partition to update 
statistics for. If this is
    *          null, then these will be assumed to be table level statistics
    * @param stats Stats object with stats for one or more columns
    * @throws IOException
    */
-  void updateStatistics(String dbName, String tableName, String partName, 
List<String> partVals,
+  void updateStatistics(String dbName, String tableName, List<String> partVals,
       ColumnStatistics stats) throws IOException {
     byte[] key = getStatisticsKey(dbName, tableName, partVals);
     String hbaseTable = getStatisticsTable(partVals);
@@ -1534,171 +1536,154 @@ class HBaseReadWrite {
    *          to translate from partName to partVals
    * @param colNames column names to fetch stats for. These columns will be 
fetched for all
    *          requested partitions
-   * @return list of ColumnStats, one for each partition. The values will be 
in the same order as
-   *         the partNames list that was passed in
+   * @return list of ColumnStats, one for each partition for which we found at 
least one column's
+   * stats.
    * @throws IOException
    */
   List<ColumnStatistics> getPartitionStatistics(String dbName, String tblName,
       List<String> partNames, List<List<String>> partVals, List<String> 
colNames)
       throws IOException {
-    List<ColumnStatistics> statsList = new 
ArrayList<ColumnStatistics>(partNames.size());
-    ColumnStatistics partitionStats;
-    ColumnStatisticsDesc statsDesc;
-    byte[][] colKeys = new byte[colNames.size()][];
-    List<Get> gets = new ArrayList<Get>();
-    // Initialize the list and build the Gets
-    for (int pOff = 0; pOff < partNames.size(); pOff++) {
-      // Add an entry for this partition in the stats list
-      partitionStats = new ColumnStatistics();
-      statsDesc = new ColumnStatisticsDesc();
-      statsDesc.setIsTblLevel(false);
-      statsDesc.setDbName(dbName);
-      statsDesc.setTableName(tblName);
-      statsDesc.setPartName(partNames.get(pOff));
-      partitionStats.setStatsDesc(statsDesc);
-      statsList.add(partitionStats);
-      // Build the list of Gets
-      for (int i = 0; i < colKeys.length; i++) {
-        colKeys[i] = HBaseUtils.buildKey(colNames.get(i));
-      }
-      byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, 
partVals.get(pOff));
+    List<ColumnStatistics> statsList = new ArrayList<>(partNames.size());
+    Map<List<String>, String> valToPartMap = new HashMap<>(partNames.size());
+    List<Get> gets = new ArrayList<>(partNames.size() * colNames.size());
+    assert partNames.size() == partVals.size();
+
+    byte[][] colNameBytes = new byte[colNames.size()][];
+    for (int i = 0; i < colNames.size(); i++) {
+      colNameBytes[i] = HBaseUtils.buildKey(colNames.get(i));
+    }
+
+    for (int i = 0; i < partNames.size(); i++) {
+      valToPartMap.put(partVals.get(i), partNames.get(i));
+      byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, 
partVals.get(i));
       Get get = new Get(partKey);
-      for (byte[] colName : colKeys) {
+      for (byte[] colName : colNameBytes) {
         get.addColumn(STATS_CF, colName);
       }
       gets.add(get);
     }
 
     HTableInterface htab = conn.getHBaseTable(PART_TABLE);
-    // Get results from HBase
     Result[] results = htab.get(gets);
-    // Deserialize the stats objects and add to stats list
-    for (int pOff = 0; pOff < results.length; pOff++) {
-      for (int cOff = 0; cOff < colNames.size(); cOff++) {
-        byte[] serializedColStats = results[pOff].getValue(STATS_CF, 
colKeys[cOff]);
-        if (serializedColStats == null) {
-          // There were no stats for this column, so skip it
-          continue;
+    for (int i = 0; i < results.length; i++) {
+      ColumnStatistics colStats = null;
+      for (int j = 0; j < colNameBytes.length; j++) {
+        byte[] serializedColStats = results[i].getValue(STATS_CF, 
colNameBytes[j]);
+        if (serializedColStats != null) {
+          if (colStats == null) {
+            // We initialize this late so that we don't create extras in the 
case of
+            // partitions with no stats
+            colStats = new ColumnStatistics();
+            statsList.add(colStats);
+            ColumnStatisticsDesc csd = new ColumnStatisticsDesc();
+
+            // We need to figure out which partition these call stats are 
from.  To do that we
+            // recontruct the key.  We have to pull the dbName and tableName 
out of the key to
+            // find the partition values.
+            byte[] key = results[i].getRow();
+            String[] reconstructedKey = HBaseUtils.parseKey(key);
+            List<String> reconstructedPartVals =
+                Arrays.asList(reconstructedKey).subList(2, 
reconstructedKey.length);
+            String partName = valToPartMap.get(reconstructedPartVals);
+            assert partName != null;
+            csd.setIsTblLevel(false);
+            csd.setDbName(dbName);
+            csd.setTableName(tblName);
+            csd.setPartName(partName);
+            colStats.setStatsDesc(csd);
+          }
+          ColumnStatisticsObj cso =
+              HBaseUtils.deserializeStatsForOneColumn(colStats, 
serializedColStats);
+          cso.setColName(colNames.get(j));
+          colStats.addToStatsObj(cso);
         }
-        partitionStats = statsList.get(pOff);
-        ColumnStatisticsObj colStats =
-            HBaseUtils.deserializeStatsForOneColumn(partitionStats, 
serializedColStats);
-        colStats.setColName(colNames.get(cOff));
-        partitionStats.addToStatsObj(colStats);
       }
     }
+
     return statsList;
   }
 
   /**
-   * Get aggregate stats for a column from the DB and populate the bloom 
filter if it's not null
-   * @param dbName
-   * @param tblName
-   * @param partNames
-   * @param partVals
-   * @param colNames
-   * @return
+   * Get a reference to the stats cache.
+   * @return the stats cache.
+   */
+  StatsCache getStatsCache() {
+    return statsCache;
+  }
+
+  /**
+   * Get aggregated stats.  Only intended for use by
+   * {@link org.apache.hadoop.hive.metastore.hbase.StatsCache}.  Others should 
not call directly
+   * but should call StatsCache.get instead.
+   * @param key The md5 hash associated with this partition set
+   * @return stats if hbase has them, else null
    * @throws IOException
    */
-  AggrStats getAggrStats(String dbName, String tblName, List<String> partNames,
-      List<List<String>> partVals, List<String> colNames) throws IOException {
-    // One ColumnStatisticsObj per column
-    List<ColumnStatisticsObj> colStatsList = new 
ArrayList<ColumnStatisticsObj>();
-    AggregateStatsCache.AggrColStats colStatsAggrCached;
-    ColumnStatisticsObj colStatsAggr;
-    int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
-    float falsePositiveProbability = 
aggrStatsCache.getFalsePositiveProbability();
-    int partitionsRequested = partNames.size();
-    // TODO: Steal extrapolation logic from current MetaStoreDirectSql code
-    // Right now doing nothing and keeping partitionsFound == 
partitionsRequested
-    int partitionsFound = partitionsRequested;
-    for (String colName : colNames) {
-      if (partitionsRequested > maxPartitionsPerCacheNode) {
-        // Read from HBase but don't add to cache since it doesn't qualify the 
criteria
-        colStatsAggr = getAggrStatsFromDB(dbName, tblName, colName, partNames, 
partVals, null);
-        colStatsList.add(colStatsAggr);
-      } else {
-        // Check the cache first
-        colStatsAggrCached = aggrStatsCache.get(dbName, tblName, colName, 
partNames);
-        if (colStatsAggrCached != null) {
-          colStatsList.add(colStatsAggrCached.getColStats());
-        } else {
-          // Bloom filter for the new node that we will eventually add to the 
cache
-          BloomFilter bloomFilter =
-              new BloomFilter(maxPartitionsPerCacheNode, 
falsePositiveProbability);
-          colStatsAggr =
-              getAggrStatsFromDB(dbName, tblName, colName, partNames, 
partVals, bloomFilter);
-          colStatsList.add(colStatsAggr);
-          // Update the cache to add this new aggregate node
-          aggrStatsCache.add(dbName, tblName, colName, partitionsFound, 
colStatsAggr, bloomFilter);
-        }
-      }
-    }
-    return new AggrStats(colStatsList, partitionsFound);
+  AggrStats getAggregatedStats(byte[] key) throws IOException{
+    byte[] serialized = read(AGGR_STATS_TABLE, key, CATALOG_CF, 
AGGR_STATS_STATS_COL);
+    if (serialized == null) return null;
+    return HBaseUtils.deserializeAggrStats(serialized);
   }
 
   /**
-   *
-   * @param dbName
-   * @param tblName
-   * @param partNames
-   * @param partVals
-   * @param colName
-   * @param bloomFilter
-   * @return
+   * Put aggregated stats  Only intended for use by
+   * {@link org.apache.hadoop.hive.metastore.hbase.StatsCache}.  Others should 
not call directly
+   * but should call StatsCache.put instead.
+   * @param key The md5 hash associated with this partition set
+   * @param dbName Database these partitions are in
+   * @param tableName Table these partitions are in
+   * @param partNames Partition names
+   * @param colName Column stats are for
+   * @param stats Stats
+   * @throws IOException
    */
-  private ColumnStatisticsObj getAggrStatsFromDB(String dbName, String 
tblName, String colName,
-      List<String> partNames, List<List<String>> partVals, BloomFilter 
bloomFilter)
+  void putAggregatedStats(byte[] key, String dbName, String tableName, 
List<String> partNames,
+                          String colName, AggrStats stats) throws IOException {
+    // Serialize the part names
+    List<String> protoNames = new ArrayList<>(partNames.size() + 3);
+    protoNames.add(dbName);
+    protoNames.add(tableName);
+    protoNames.add(colName);
+    protoNames.addAll(partNames);
+    // Build a bloom Filter for these partitions
+    BloomFilter bloom = new BloomFilter(partNames.size(), STATS_BF_ERROR_RATE);
+    for (String partName : partNames) {
+      bloom.add(partName.getBytes(HBaseUtils.ENCODING));
+    }
+    byte[] serializedFilter = HBaseUtils.serializeBloomFilter(dbName, 
tableName, bloom);
+
+    byte[] serializedStats = HBaseUtils.serializeAggrStats(stats);
+    store(AGGR_STATS_TABLE, key, CATALOG_CF,
+        new byte[][]{AGGR_STATS_BLOOM_COL, AGGR_STATS_STATS_COL},
+        new byte[][]{serializedFilter, serializedStats});
+  }
+
+  // TODO - We shouldn't remove an entry from the cache as soon as a single 
partition is deleted.
+  // TODO - Instead we should keep track of how many partitions have been 
deleted and only remove
+  // TODO - an entry once it passes a certain threshold, like 5%, of 
partitions have been removed.
+  // TODO - That requires moving this from a filter to a co-processor.
+  /**
+   * Invalidate stats associated with the listed partitions.  This method is 
intended for use
+   * only by {@link org.apache.hadoop.hive.metastore.hbase.StatsCache}.
+   * @param filter serialized version of the filter to pass
+   * @return List of md5 hash keys for the partition stat sets that were 
removed.
+   * @throws IOException
+   */
+  List<StatsCache.StatsCacheKey>
+  invalidateAggregatedStats(HbaseMetastoreProto.AggrStatsInvalidatorFilter 
filter)
       throws IOException {
-    ColumnStatisticsObj colStatsAggr = new ColumnStatisticsObj();
-    boolean colStatsAggrInited = false;
-    ColumnStatsAggregator colStatsAggregator = null;
-    List<Get> gets = new ArrayList<Get>();
-    byte[] colKey = HBaseUtils.buildKey(colName);
-    // Build a list of Gets, one per partition
-    for (int pOff = 0; pOff < partNames.size(); pOff++) {
-      byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, 
partVals.get(pOff));
-      Get get = new Get(partKey);
-      get.addColumn(STATS_CF, colKey);
-      gets.add(get);
-    }
-    HTableInterface htab = conn.getHBaseTable(PART_TABLE);
-    // Get results from HBase
-    Result[] results = htab.get(gets);
-    // Iterate through the results
-    // The results size and order is the same as the number and order of the 
Gets
-    // If the column is not present in a partition, the Result object will be 
empty
-    for (int pOff = 0; pOff < partNames.size(); pOff++) {
-      if (results[pOff].isEmpty()) {
-        // There were no stats for this column, so skip it
-        continue;
-      }
-      byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKey);
-      if (serializedColStats == null) {
-        // There were no stats for this column, so skip it
-        continue;
-      }
-      ColumnStatisticsObj colStats =
-          HBaseUtils.deserializeStatsForOneColumn(null, serializedColStats);
-      if (!colStatsAggrInited) {
-        // This is the 1st column stats object we got
-        colStatsAggr.setColName(colName);
-        colStatsAggr.setColType(colStats.getColType());
-        colStatsAggr.setStatsData(colStats.getStatsData());
-        colStatsAggregator =
-            
ColumnStatsAggregatorFactory.getColumnStatsAggregator(colStats.getStatsData()
-                .getSetField());
-        colStatsAggrInited = true;
-      } else {
-        // Perform aggregation with whatever we've already aggregated
-        colStatsAggregator.aggregate(colStatsAggr, colStats);
-      }
-      // Add partition to the bloom filter if it's requested
-      if (bloomFilter != null) {
-        bloomFilter.add(partNames.get(pOff).getBytes());
-      }
+    Iterator<Result> results = scan(AGGR_STATS_TABLE, new 
AggrStatsInvalidatorFilter(filter));
+    if (!results.hasNext()) return Collections.emptyList();
+    List<Delete> deletes = new ArrayList<>();
+    List<StatsCache.StatsCacheKey> keys = new ArrayList<>();
+    while (results.hasNext()) {
+      Result result = results.next();
+      deletes.add(new Delete(result.getRow()));
+      keys.add(new StatsCache.StatsCacheKey(result.getRow()));
     }
-    return colStatsAggr;
+    HTableInterface htab = conn.getHBaseTable(AGGR_STATS_TABLE);
+    htab.delete(deletes);
+    return keys;
   }
 
   private byte[] getStatisticsKey(String dbName, String tableName, 
List<String> partVals) {
@@ -1718,9 +1703,12 @@ class HBaseReadWrite {
    * This should be called whenever a new query is started.
    */
   void flushCatalogCache() {
-    for (Counter counter : counters) {
-      LOG.debug(counter.dump());
-      counter.clear();
+    if (LOG.isDebugEnabled()) {
+      for (Counter counter : counters) {
+        LOG.debug(counter.dump());
+        counter.clear();
+      }
+      statsCache.dumpCounters();
     }
     tableCache.flush();
     sdCache.flush();
@@ -1794,6 +1782,10 @@ class HBaseReadWrite {
     return scan(table, null, null, colFam, colName, filter);
   }
 
+  private Iterator<Result> scan(String table, Filter filter) throws 
IOException {
+    return scan(table, null, null, null, null, filter);
+  }
+
   private Iterator<Result> scan(String table, byte[] keyStart, byte[] keyEnd, 
byte[] colFam,
                                           byte[] colName, Filter filter) 
throws IOException {
     HTableInterface htab = conn.getHBaseTable(table);
@@ -1804,7 +1796,9 @@ class HBaseReadWrite {
     if (keyEnd != null) {
       s.setStopRow(keyEnd);
     }
-    s.addColumn(colFam, colName);
+    if (colFam != null && colName != null) {
+      s.addColumn(colFam, colName);
+    }
     if (filter != null) {
       s.setFilter(filter);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 4fa2ae5..9782859 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hive.metastore.hbase;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheLoader;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -370,6 +372,9 @@ public class HBaseStore implements RawStore {
     openTransaction();
     try {
       getHBase().deletePartition(dbName, tableName, part_vals);
+      // Drop any cached stats that reference this partitions
+      getHBase().getStatsCache().invalidate(dbName, tableName,
+          buildExternalPartName(dbName, tableName, part_vals));
       commit = true;
       return true;
     } catch (IOException e) {
@@ -1472,7 +1477,7 @@ public class HBaseStore implements RawStore {
     openTransaction();
     try {
       getHBase().updateStatistics(colStats.getStatsDesc().getDbName(),
-          colStats.getStatsDesc().getTableName(), null, null, colStats);
+          colStats.getStatsDesc().getTableName(), null, colStats);
       commit = true;
       return true;
     } catch (IOException e) {
@@ -1491,8 +1496,10 @@ public class HBaseStore implements RawStore {
     openTransaction();
     try {
       getHBase().updateStatistics(statsObj.getStatsDesc().getDbName(),
-          statsObj.getStatsDesc().getTableName(), 
statsObj.getStatsDesc().getPartName(),
-          partVals, statsObj);
+          statsObj.getStatsDesc().getTableName(), partVals, statsObj);
+      // We need to invalidate aggregates that include this partition
+      
getHBase().getStatsCache().invalidate(statsObj.getStatsDesc().getDbName(),
+          statsObj.getStatsDesc().getTableName(), 
statsObj.getStatsDesc().getPartName());
       commit = true;
       return true;
     } catch (IOException e) {
@@ -1528,7 +1535,6 @@ public class HBaseStore implements RawStore {
     for (String partName : partNames) {
       partVals.add(partNameToVals(partName));
     }
-    for (String partName : partNames) partVals.add(partNameToVals(partName));
     boolean commit = false;
     openTransaction();
     try {
@@ -1574,9 +1580,24 @@ public class HBaseStore implements RawStore {
     boolean commit = false;
     openTransaction();
     try {
-      AggrStats stats = getHBase().getAggrStats(dbName, tblName, partNames, 
partVals, colNames);
+      AggrStats aggrStats = new AggrStats();
+      for (String colName : colNames) {
+        try {
+          AggrStats oneCol =
+              getHBase().getStatsCache().get(dbName, tblName, partNames, 
colName);
+          if (oneCol.getColStatsSize() > 0) {
+            assert oneCol.getColStatsSize() == 1;
+            aggrStats.setPartsFound(aggrStats.getPartsFound() + 
oneCol.getPartsFound());
+            aggrStats.addToColStats(oneCol.getColStats().get(0));
+          }
+        } catch (CacheLoader.InvalidCacheLoadException e) {
+          LOG.debug("Found no stats for column " + colName);
+          // This means we have no stats at all for this column for these 
partitions, so just
+          // move on.
+        }
+      }
       commit = true;
-      return stats;
+      return aggrStats;
     } catch (IOException e) {
       LOG.error("Unable to fetch aggregate column statistics", e);
       throw new MetaException("Failed fetching aggregate column statistics, " 
+ e.getMessage());
@@ -2068,7 +2089,7 @@ public class HBaseStore implements RawStore {
     return FileUtils.makePartName(partCols, partVals);
   }
 
-  private List<String> partNameToVals(String name) {
+  private static List<String> partNameToVals(String name) {
     if (name == null) return null;
     List<String> vals = new ArrayList<String>();
     String[] kvp = name.split("/");
@@ -2078,6 +2099,14 @@ public class HBaseStore implements RawStore {
     return vals;
   }
 
+  static List<List<String>> partNameListToValsList(List<String> partNames) {
+    List<List<String>> valLists = new 
ArrayList<List<String>>(partNames.size());
+    for (String partName : partNames) {
+      valLists.add(partNameToVals(partName));
+    }
+    return valLists;
+  }
+
   private String likeToRegex(String like) {
     if (like == null) return null;
     // Convert Hive's strange like syntax to Java regex.  Per
@@ -2097,4 +2126,8 @@ public class HBaseStore implements RawStore {
       rollbackTransaction();
     }
   }
+
+  @VisibleForTesting HBaseReadWrite backdoor() {
+    return getHBase();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 969c979..4d57af2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -23,11 +23,11 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Decimal;
@@ -50,7 +50,7 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TFieldIdEnum;
+import org.apache.hive.common.util.BloomFilter;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -98,6 +98,11 @@ class HBaseUtils {
     return protoKey.getBytes(ENCODING);
   }
 
+  static String[] parseKey(byte[] serialized) {
+    String munged = new String(serialized, ENCODING);
+    return munged.split(KEY_SEPARATOR_STR);
+  }
+
   private static HbaseMetastoreProto.Parameters buildParameters(Map<String, 
String> params) {
     List<HbaseMetastoreProto.ParameterEntry> entries =
         new ArrayList<HbaseMetastoreProto.ParameterEntry>();
@@ -902,14 +907,40 @@ class HBaseUtils {
     return sdParts;
   }
 
-  static byte[] serializeStatsForOneColumn(ColumnStatistics 
partitionColumnStats, ColumnStatisticsObj colStats)
+  static byte[] serializeBloomFilter(String dbName, String tableName, 
BloomFilter bloom) {
+    long[] bitSet = bloom.getBitSet();
+    List<Long> bits = new ArrayList<>(bitSet.length);
+    for (int i = 0; i < bitSet.length; i++) bits.add(bitSet[i]);
+    HbaseMetastoreProto.AggrStatsBloomFilter.BloomFilter protoBloom =
+        HbaseMetastoreProto.AggrStatsBloomFilter.BloomFilter.newBuilder()
+        .setNumBits(bloom.getBitSize())
+        .setNumFuncs(bloom.getNumHashFunctions())
+        .addAllBits(bits)
+        .build();
+
+    HbaseMetastoreProto.AggrStatsBloomFilter proto =
+        HbaseMetastoreProto.AggrStatsBloomFilter.newBuilder()
+            .setDbName(ByteString.copyFrom(dbName.getBytes(ENCODING)))
+            .setTableName(ByteString.copyFrom(tableName.getBytes(ENCODING)))
+            .setBloomFilter(protoBloom)
+            .setAggregatedAt(System.currentTimeMillis())
+            .build();
+
+    return proto.toByteArray();
+  }
+
+  private static HbaseMetastoreProto.ColumnStats
+  protoBufStatsForOneColumn(ColumnStatistics partitionColumnStats, 
ColumnStatisticsObj colStats)
       throws IOException {
     HbaseMetastoreProto.ColumnStats.Builder builder = 
HbaseMetastoreProto.ColumnStats.newBuilder();
-    
builder.setLastAnalyzed(partitionColumnStats.getStatsDesc().getLastAnalyzed());
-    if (colStats.getColType() == null) {
-      throw new RuntimeException("Column type must be set");
+    if (partitionColumnStats != null) {
+      
builder.setLastAnalyzed(partitionColumnStats.getStatsDesc().getLastAnalyzed());
     }
+    assert colStats.getColType() != null;
     builder.setColumnType(colStats.getColType());
+    assert colStats.getColName() != null;
+    builder.setColumnName(colStats.getColName());
+
     ColumnStatisticsData colData = colStats.getStatsData();
     switch (colData.getSetField()) {
       case BOOLEAN_STATS:
@@ -987,12 +1018,23 @@ class HBaseUtils {
       default:
         throw new RuntimeException("Woh, bad.  Unknown stats type!");
     }
-    return builder.build().toByteArray();
+    return builder.build();
+  }
+
+  static byte[] serializeStatsForOneColumn(ColumnStatistics 
partitionColumnStats,
+                                           ColumnStatisticsObj colStats) 
throws IOException {
+    return protoBufStatsForOneColumn(partitionColumnStats, 
colStats).toByteArray();
   }
 
   static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics 
partitionColumnStats,
       byte[] bytes) throws IOException {
     HbaseMetastoreProto.ColumnStats proto = 
HbaseMetastoreProto.ColumnStats.parseFrom(bytes);
+    return statsForOneColumnFromProtoBuf(partitionColumnStats, proto);
+  }
+
+  private static ColumnStatisticsObj
+  statsForOneColumnFromProtoBuf(ColumnStatistics partitionColumnStats,
+                                HbaseMetastoreProto.ColumnStats proto) throws 
IOException {
     ColumnStatisticsObj colStats = new ColumnStatisticsObj();
     long lastAnalyzed = proto.getLastAnalyzed();
     if (partitionColumnStats != null) {
@@ -1000,6 +1042,7 @@ class HBaseUtils {
           Math.max(lastAnalyzed, 
partitionColumnStats.getStatsDesc().getLastAnalyzed()));
     }
     colStats.setColType(proto.getColumnType());
+    colStats.setColName(proto.getColumnName());
 
     ColumnStatisticsData colData = new ColumnStatisticsData();
     if (proto.hasBoolStats()) {
@@ -1067,6 +1110,30 @@ class HBaseUtils {
     return colStats;
   }
 
+  static byte[] serializeAggrStats(AggrStats aggrStats) throws IOException {
+    List<HbaseMetastoreProto.ColumnStats> protoColStats =
+        new ArrayList<>(aggrStats.getColStatsSize());
+    for (ColumnStatisticsObj cso : aggrStats.getColStats()) {
+      protoColStats.add(protoBufStatsForOneColumn(null, cso));
+    }
+    return HbaseMetastoreProto.AggrStats.newBuilder()
+        .setPartsFound(aggrStats.getPartsFound())
+        .addAllColStats(protoColStats)
+        .build()
+        .toByteArray();
+  }
+
+  static AggrStats deserializeAggrStats(byte[] serialized) throws IOException {
+    HbaseMetastoreProto.AggrStats protoAggrStats =
+        HbaseMetastoreProto.AggrStats.parseFrom(serialized);
+    AggrStats aggrStats = new AggrStats();
+    aggrStats.setPartsFound(protoAggrStats.getPartsFound());
+    for (HbaseMetastoreProto.ColumnStats protoCS : 
protoAggrStats.getColStatsList()) {
+      aggrStats.addToColStats(statsForOneColumnFromProtoBuf(null, protoCS));
+    }
+    return aggrStats;
+  }
+
   /**
    * @param keyStart byte array representing the start prefix
    * @return byte array corresponding to the next possible prefix

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
new file mode 100644
index 0000000..0d3ed40
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregator;
+import 
org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregatorFactory;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A cache for stats.  This is only intended for use by
+ * {@link org.apache.hadoop.hive.metastore.hbase.HBaseReadWrite} and should 
not be used outside
+ * that class.
+ */
+class StatsCache {
+
+  private static final Log LOG = LogFactory.getLog(StatsCache.class.getName());
+  private static StatsCache self = null;
+
+  private LoadingCache<StatsCacheKey, AggrStats> cache;
+  private Invalidator invalidator;
+  private long runInvalidatorEvery;
+  private long maxTimeInCache;
+  private boolean invalidatorHasRun;
+
+  @VisibleForTesting Counter misses;
+  @VisibleForTesting Counter hbaseHits;
+  @VisibleForTesting Counter totalGets;
+
+  static synchronized StatsCache getInstance(Configuration conf) {
+    if (self == null) {
+      self = new StatsCache(conf);
+    }
+    return self;
+  }
+
+  private StatsCache(Configuration conf) {
+    final StatsCache me = this;
+    cache = CacheBuilder.newBuilder()
+        .maximumSize(
+            HiveConf.getIntVar(conf, 
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_CACHE_ENTRIES))
+        .expireAfterWrite(HiveConf.getTimeVar(conf,
+            HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_MEMORY_TTL, 
TimeUnit.SECONDS), TimeUnit.SECONDS)
+        .build(new CacheLoader<StatsCacheKey, AggrStats>() {
+          @Override
+          public AggrStats load(StatsCacheKey key) throws Exception {
+            HBaseReadWrite hrw = HBaseReadWrite.getInstance();
+            AggrStats aggrStats = hrw.getAggregatedStats(key.hashed);
+            if (aggrStats == null) {
+              misses.incr();
+              ColumnStatsAggregator aggregator = null;
+              ColumnStatisticsObj statsObj = null;
+              aggrStats = new AggrStats();
+              LOG.debug("Unable to find aggregated stats for " + key.colName + 
", aggregating");
+              List<ColumnStatistics> css = 
hrw.getPartitionStatistics(key.dbName, key.tableName,
+                  key.partNames, 
HBaseStore.partNameListToValsList(key.partNames),
+                  Collections.singletonList(key.colName));
+              if (css != null && css.size() > 0) {
+                aggrStats.setPartsFound(css.size());
+                for (ColumnStatistics cs : css) {
+                  for (ColumnStatisticsObj cso : cs.getStatsObj()) {
+                    if (statsObj == null) {
+                      statsObj = 
ColumnStatsAggregatorFactory.newColumnStaticsObj(key.colName,
+                          cso.getStatsData().getSetField());
+                    }
+                    if (aggregator == null) {
+                      aggregator = 
ColumnStatsAggregatorFactory.getColumnStatsAggregator(
+                          cso.getStatsData().getSetField());
+                    }
+                    aggregator.aggregate(statsObj, cso);
+                  }
+                }
+                aggrStats.addToColStats(statsObj);
+                me.put(key, aggrStats);
+              }
+            } else {
+              hbaseHits.incr();
+            }
+            return aggrStats;
+          }
+        });
+    misses = new Counter("Stats cache table misses");
+    hbaseHits = new Counter("Stats cache table hits");
+    totalGets = new Counter("Total get calls to the stats cache");
+
+    maxTimeInCache = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_HBASE_TTL, 
TimeUnit.SECONDS);
+    // We want runEvery in milliseconds, even though we give the default value 
in the conf in
+    // seconds.
+    runInvalidatorEvery = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY, 
TimeUnit.MILLISECONDS);
+
+    invalidator = new Invalidator();
+    invalidator.setDaemon(true);
+    invalidator.start();
+  }
+
+  /**
+   * Add an object to the cache.
+   * @param key Key for this entry
+   * @param aggrStats stats
+   * @throws java.io.IOException
+   */
+  void put(StatsCacheKey key, AggrStats aggrStats) throws IOException {
+    HBaseReadWrite.getInstance().putAggregatedStats(key.hashed, key.dbName, 
key.tableName,
+        key.partNames,
+        key.colName, aggrStats);
+    cache.put(key, aggrStats);
+  }
+
+  /**
+   * Get partition level statistics
+   * @param dbName name of database table is in
+   * @param tableName name of table
+   * @param partNames names of the partitions
+   * @param colName of column to get stats for
+   * @return stats object for this column, or null if none cached
+   * @throws java.io.IOException
+   */
+
+  AggrStats get(String dbName, String tableName, List<String> partNames, 
String colName)
+      throws IOException {
+    totalGets.incr();
+    StatsCacheKey key = new StatsCacheKey(dbName, tableName, partNames, 
colName);
+    try {
+      return cache.get(key);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Remove all entries that are related to a particular set of partitions.  
This should be
+   * called when partitions are deleted or stats are updated.
+   * @param dbName name of database table is in
+   * @param tableName name of table
+   * @param partName name of the partition
+   * @throws IOException
+   */
+  void invalidate(String dbName, String tableName, String partName)
+      throws IOException {
+    invalidator.addToQueue(
+        HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry.newBuilder()
+            
.setDbName(ByteString.copyFrom(dbName.getBytes(HBaseUtils.ENCODING)))
+            
.setTableName(ByteString.copyFrom(tableName.getBytes(HBaseUtils.ENCODING)))
+            
.setPartName(ByteString.copyFrom(partName.getBytes(HBaseUtils.ENCODING)))
+            .build());
+  }
+
+  void dumpCounters() {
+    LOG.debug(misses.dump());
+    LOG.debug(hbaseHits.dump());
+    LOG.debug(totalGets.dump());
+  }
+
+  /**
+   * Completely dump the cache from memory, used to test that we can access 
stats from HBase itself.
+   * @throws IOException
+   */
+  @VisibleForTesting void flushMemory() throws IOException {
+    cache.invalidateAll();
+  }
+
+  @VisibleForTesting void resetCounters() {
+    misses.clear();
+    hbaseHits.clear();
+    totalGets.clear();
+  }
+
+  @VisibleForTesting void setRunInvalidatorEvery(long runEvery) {
+    runInvalidatorEvery = runEvery;
+  }
+
+  @VisibleForTesting void setMaxTimeInCache(long maxTime) {
+    maxTimeInCache = maxTime;
+  }
+
+  @VisibleForTesting void wakeInvalidator() throws InterruptedException {
+    invalidatorHasRun = false;
+    // Wait through 2 cycles so we're sure our entry won't be picked as too 
new.
+    Thread.sleep(2 * runInvalidatorEvery);
+    invalidator.interrupt();
+    while (!invalidatorHasRun) {
+      Thread.sleep(10);
+    }
+  }
+
+  static class StatsCacheKey {
+    final byte[] hashed;
+    String dbName;
+    String tableName;
+    List<String> partNames;
+    String colName;
+    private MessageDigest md;
+
+    StatsCacheKey(byte[] key) {
+      hashed = key;
+    }
+
+    StatsCacheKey(String dbName, String tableName, List<String> partNames, 
String colName) {
+      this.dbName = dbName;
+      this.tableName = tableName;
+      this.partNames = partNames;
+      this.colName = colName;
+
+      try {
+        md = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      md.update(dbName.getBytes(HBaseUtils.ENCODING));
+      md.update(tableName.getBytes(HBaseUtils.ENCODING));
+      Collections.sort(this.partNames);
+      for (String s : partNames) {
+        md.update(s.getBytes(HBaseUtils.ENCODING));
+      }
+      md.update(colName.getBytes(HBaseUtils.ENCODING));
+      hashed = md.digest();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof StatsCacheKey)) return false;
+      StatsCacheKey that = (StatsCacheKey)other;
+      return Arrays.equals(hashed, that.hashed);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(hashed);
+    }
+  }
+
+  private class Invalidator extends Thread {
+    private List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> entries 
= new ArrayList<>();
+    private Lock lock = new ReentrantLock();
+
+    void addToQueue(HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry 
entry) {
+      lock.lock();
+      try {
+        entries.add(entry);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        long startedAt = System.currentTimeMillis();
+        List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> thisRun = 
null;
+        lock.lock();
+        try {
+          if (entries.size() > 0) {
+            thisRun = entries;
+            entries = new ArrayList<>();
+          }
+        } finally {
+          lock.unlock();
+        }
+
+        if (thisRun != null) {
+          try {
+            HbaseMetastoreProto.AggrStatsInvalidatorFilter filter =
+                HbaseMetastoreProto.AggrStatsInvalidatorFilter.newBuilder()
+                .setRunEvery(runInvalidatorEvery)
+                .setMaxCacheEntryLife(maxTimeInCache)
+                .addAllToInvalidate(thisRun)
+                .build();
+            List<StatsCacheKey> keys =
+                HBaseReadWrite.getInstance().invalidateAggregatedStats(filter);
+            cache.invalidateAll(keys);
+          } catch (IOException e) {
+            // Not a lot I can do here
+            LOG.error("Caught error while invalidating entries in the cache", 
e);
+          }
+        }
+        invalidatorHasRun = true;
+
+        try {
+          sleep(runInvalidatorEvery - (System.currentTimeMillis() - 
startedAt));
+        } catch (InterruptedException e) {
+          LOG.warn("Interupted while sleeping", e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
index 3fa0614..ebecfe3 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
@@ -19,7 +19,15 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 
 public class ColumnStatsAggregatorFactory {
 
@@ -41,8 +49,51 @@ public class ColumnStatsAggregatorFactory {
     case DECIMAL_STATS:
       return new DecimalColumnStatsAggregator();
     default:
+      throw new RuntimeException("Woh, bad.  Unknown stats type " + 
type.toString());
+    }
+  }
+
+  public static ColumnStatisticsObj newColumnStaticsObj(String colName, 
_Fields type) {
+    ColumnStatisticsObj cso = new ColumnStatisticsObj();
+    ColumnStatisticsData csd = new ColumnStatisticsData();
+    cso.setColName(colName);
+    switch (type) {
+    case BOOLEAN_STATS:
+      csd.setBooleanStats(new BooleanColumnStatsData());
+      cso.setColType("boolean");
+      break;
+
+    case LONG_STATS:
+      csd.setLongStats(new LongColumnStatsData());
+      cso.setColType("long");
+      break;
+
+    case DOUBLE_STATS:
+      csd.setDoubleStats(new DoubleColumnStatsData());
+      cso.setColType("double");
+      break;
+
+    case STRING_STATS:
+      csd.setStringStats(new StringColumnStatsData());
+      cso.setColType("string");
+      break;
+
+    case BINARY_STATS:
+      csd.setBinaryStats(new BinaryColumnStatsData());
+      cso.setColType("binary");
+      break;
+
+    case DECIMAL_STATS:
+      csd.setDecimalStats(new DecimalColumnStatsData());
+      cso.setColType("decimal");
+      break;
+
+    default:
       throw new RuntimeException("Woh, bad.  Unknown stats type!");
     }
+
+    cso.setStatsData(csd);
+    return cso;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
----------------------------------------------------------------------
diff --git 
a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
 
b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
index 0aa0d21..3cd8867 100644
--- 
a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
+++ 
b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
@@ -22,6 +22,35 @@ enum PrincipalType {
     ROLE = 1;
 }
 
+message AggrStats {
+  required int64 parts_found = 1;
+  repeated ColumnStats col_stats = 2;
+}
+
+message AggrStatsBloomFilter {
+  message BloomFilter {
+    required int32 num_bits = 1;
+    required int32 num_funcs = 2;
+    repeated int64 bits = 3;
+  }
+  required bytes db_name = 1;
+  required bytes table_name = 2;
+  required BloomFilter bloom_filter = 3;
+  required int64 aggregated_at = 4;
+}
+
+message AggrStatsInvalidatorFilter {
+  message Entry {
+    required bytes db_name = 1;
+    required bytes table_name = 2;
+    required bytes part_name = 3;
+  }
+
+  repeated Entry to_invalidate = 1;
+  required int64 run_every = 2;
+  required int64 max_cache_entry_life = 3;
+}
+
 message ColumnStats {
 
   message BooleanStats {
@@ -63,6 +92,7 @@ message ColumnStats {
   optional StringStats string_stats = 8;
   optional StringStats binary_stats = 9;
   optional DecimalStats decimal_stats = 10;
+  optional string column_name = 11;
 }
 
 message Database {

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java
new file mode 100644
index 0000000..af8f5fc
--- /dev/null
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TestHBaseAggregateStatsCache {
+  private static final Log LOG = 
LogFactory.getLog(TestHBaseAggregateStatsCache.class.getName());
+
+  @Mock HTableInterface htable;
+  private HBaseStore store;
+  SortedMap<String, Cell> rows = new TreeMap<>();
+
+  @Before
+  public void before() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    HiveConf conf = new HiveConf();
+    conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true);
+    store = MockUtils.init(conf, htable, rows);
+    store.backdoor().getStatsCache().resetCounters();
+  }
+
+  private static interface Checker {
+    void checkStats(AggrStats aggrStats) throws Exception;
+  }
+
+  // Do to limitations in the Mock infrastructure we use for HBase testing we 
can only test
+  // this for a single column table and we can't really test hits in hbase, 
only in memory or
+  // build from scratch.  But it's still useful to cover many bugs.  More in 
depth testing with
+  // multiple columns and with HBase hits is done in 
TestHBaseAggrStatsCacheIntegration.
+
+  @Test
+  public void allWithStats() throws Exception {
+    String dbName = "default";
+    String tableName = "hit";
+    List<String> partVals1 = Arrays.asList("today");
+    List<String> partVals2 = Arrays.asList("yesterday");
+    long now = System.currentTimeMillis();
+
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, Collections.<String, String>emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, 
sd, partCols,
+        Collections.<String, String>emptyMap(), null, null, null);
+    store.createTable(table);
+
+    for (List<String> partVals : Arrays.asList(partVals1, partVals2)) {
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVals.get(0));
+      Partition part = new Partition(partVals, dbName, tableName, (int) now, 
(int) now, psd,
+          Collections.<String, String>emptyMap());
+      store.addPartition(part);
+
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, 
tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVals.get(0));
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col1");
+      obj.setColType("boolean");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+      bcsd.setNumFalses(10);
+      bcsd.setNumTrues(20);
+      bcsd.setNumNulls(30);
+      data.setBooleanStats(bcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+
+      store.updatePartitionColumnStatistics(cs, partVals);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(2, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1", cso.getColName());
+        Assert.assertEquals("boolean", cso.getColType());
+        BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats();
+        Assert.assertEquals(20, bcsd.getNumFalses());
+        Assert.assertEquals(40, bcsd.getNumTrues());
+        Assert.assertEquals(60, bcsd.getNumNulls());
+      }
+    };
+
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+        Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+
+    // Check that we had to build it from the stats
+    Assert.assertEquals(0, 
store.backdoor().getStatsCache().hbaseHits.getCnt());
+    Assert.assertEquals(1, 
store.backdoor().getStatsCache().totalGets.getCnt());
+    Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+
+    // Call again, this time it should come from memory.  Also, reverse the 
name order this time
+    // to assure that we still hit.
+    aggrStats = store.get_aggr_stats_for(dbName, tableName,
+        Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+
+    Assert.assertEquals(0, 
store.backdoor().getStatsCache().hbaseHits.getCnt());
+    Assert.assertEquals(2, 
store.backdoor().getStatsCache().totalGets.getCnt());
+    Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+  }
+
+
+  @Test
+  public void noneWithStats() throws Exception {
+    String dbName = "default";
+    String tableName = "nws";
+    List<String> partVals1 = Arrays.asList("today");
+    List<String> partVals2 = Arrays.asList("yesterday");
+    long now = System.currentTimeMillis();
+
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, Collections.<String, String>emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, 
sd, partCols,
+        Collections.<String, String>emptyMap(), null, null, null);
+    store.createTable(table);
+
+    for (List<String> partVals : Arrays.asList(partVals1, partVals2)) {
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/nws/ds=" + partVals.get(0));
+      Partition part = new Partition(partVals, dbName, tableName, (int) now, 
(int) now, psd,
+          Collections.<String, String>emptyMap());
+      store.addPartition(part);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(0, aggrStats.getPartsFound());
+      }
+    };
+
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+        Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void someNonexistentPartitions() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    List<String> partVals1 = Arrays.asList("today");
+    List<String> partVals2 = Arrays.asList("yesterday");
+    long now = System.currentTimeMillis();
+
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, Collections.<String, String>emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, 
sd, partCols,
+        Collections.<String, String>emptyMap(), null, null, null);
+    store.createTable(table);
+
+    StorageDescriptor psd = new StorageDescriptor(sd);
+    psd.setLocation("file:/tmp/default/hit/ds=" + partVals1.get(0));
+    Partition part = new Partition(partVals1, dbName, tableName, (int) now, 
(int) now, psd,
+        Collections.<String, String>emptyMap());
+    store.addPartition(part);
+
+    ColumnStatistics cs = new ColumnStatistics();
+    ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, 
tableName);
+    desc.setLastAnalyzed(now);
+    desc.setPartName("ds=" + partVals1.get(0));
+    cs.setStatsDesc(desc);
+    ColumnStatisticsObj obj = new ColumnStatisticsObj();
+    obj.setColName("col1");
+    obj.setColType("double");
+    ColumnStatisticsData data = new ColumnStatisticsData();
+    DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+    dcsd.setHighValue(1000.2342343);
+    dcsd.setLowValue(-20.1234213423);
+    dcsd.setNumNulls(30);
+    dcsd.setNumDVs(12342);
+    data.setDoubleStats(dcsd);
+    obj.setStatsData(data);
+    cs.addToStatsObj(obj);
+
+    store.updatePartitionColumnStatistics(cs, partVals1);
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(1, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1", cso.getColName());
+        Assert.assertEquals("double", cso.getColType());
+        DoubleColumnStatsData dcsd = cso.getStatsData().getDoubleStats();
+        Assert.assertEquals(1000.23, dcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-20.12, dcsd.getLowValue(), 0.01);
+        Assert.assertEquals(30, dcsd.getNumNulls());
+        Assert.assertEquals(12342, dcsd.getNumDVs());
+      }
+    };
+
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+        Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+
+    // Check that we had to build it from the stats
+    Assert.assertEquals(0, 
store.backdoor().getStatsCache().hbaseHits.getCnt());
+    Assert.assertEquals(1, 
store.backdoor().getStatsCache().totalGets.getCnt());
+    Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+
+    // Call again, this time it should come from memory.  Also, reverse the 
name order this time
+    // to assure that we still hit.
+    aggrStats = store.get_aggr_stats_for(dbName, tableName,
+        Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+
+    Assert.assertEquals(0, 
store.backdoor().getStatsCache().hbaseHits.getCnt());
+    Assert.assertEquals(2, 
store.backdoor().getStatsCache().totalGets.getCnt());
+    Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+  }
+
+  @Test
+  public void nonexistentPartitions() throws Exception {
+    String dbName = "default";
+    String tableName = "nep";
+    List<String> partVals1 = Arrays.asList("today");
+    List<String> partVals2 = Arrays.asList("yesterday");
+    long now = System.currentTimeMillis();
+
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, Collections.<String, String>emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, 
sd, partCols,
+        Collections.<String, String>emptyMap(), null, null, null);
+    store.createTable(table);
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(0, aggrStats.getPartsFound());
+      }
+    };
+
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+        Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+
+    // Check that we had to build it from the stats
+    Assert.assertEquals(0, 
store.backdoor().getStatsCache().hbaseHits.getCnt());
+    Assert.assertEquals(1, 
store.backdoor().getStatsCache().totalGets.getCnt());
+    Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+  }
+  // TODO test invalidation
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
index 92c9ba4..9878499 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
@@ -124,7 +124,7 @@ public class TestHBaseStore {
 
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Mock HTableInterface htable;
-  SortedMap<String, Cell> rows = new TreeMap<String, Cell>();
+  SortedMap<String, Cell> rows = new TreeMap<>();
   HBaseStore store;
 
 

Reply via email to