http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
new file mode 100644
index 0000000..a76b848
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.cache;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
+import 
org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SharedCache {
+  private Map<String, Database> databaseCache = new TreeMap<>();
+  private Map<String, TableWrapper> tableCache = new TreeMap<>();
+  private Map<String, PartitionWrapper> partitionCache = new TreeMap<>();
+  private Map<String, ColumnStatisticsObj> partitionColStatsCache = new 
TreeMap<>();
+  private Map<String, ColumnStatisticsObj> tableColStatsCache = new 
TreeMap<>();
+  private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new 
HashMap<>();
+  private static MessageDigest md;
+
+  private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class);
+
+  static {
+    try {
+      md = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException("should not happen", e);
+    }
+  }
+
+  public synchronized Database getDatabaseFromCache(String name) {
+    return 
databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+  }
+
+  public synchronized void addDatabaseToCache(String dbName, Database db) {
+    Database dbCopy = db.deepCopy();
+    dbCopy.setName(StringUtils.normalizeIdentifier(dbName));
+    databaseCache.put(dbName, dbCopy);
+  }
+
+  public synchronized void removeDatabaseFromCache(String dbName) {
+    databaseCache.remove(dbName);
+  }
+
+  public synchronized List<String> listCachedDatabases() {
+    return new ArrayList<>(databaseCache.keySet());
+  }
+
+  public synchronized void alterDatabaseInCache(String dbName, Database newDb) 
{
+    removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
+    addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), 
newDb.deepCopy());
+  }
+
+  public synchronized int getCachedDatabaseCount() {
+    return databaseCache.size();
+  }
+
+  public synchronized Table getTableFromCache(String dbName, String tableName) 
{
+    TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, 
tableName));
+    if (tblWrapper == null) {
+      return null;
+    }
+    Table t = CacheUtils.assemble(tblWrapper, this);
+    return t;
+  }
+
+  public synchronized void addTableToCache(String dbName, String tblName, 
Table tbl) {
+    Table tblCopy = tbl.deepCopy();
+    tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName));
+    tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName));
+    if (tblCopy.getPartitionKeys() != null) {
+      for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+        fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
+      }
+    }
+    TableWrapper wrapper;
+    if (tbl.getSd() != null) {
+      byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md);
+      StorageDescriptor sd = tbl.getSd();
+      increSd(sd, sdHash);
+      tblCopy.setSd(null);
+      wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), 
sd.getParameters());
+    } else {
+      wrapper = new TableWrapper(tblCopy, null, null, null);
+    }
+    tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+  }
+
+  public synchronized void removeTableFromCache(String dbName, String tblName) 
{
+    TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, 
tblName));
+    byte[] sdHash = tblWrapper.getSdHash();
+    if (sdHash!=null) {
+      decrSd(sdHash);
+    }
+  }
+
+  public synchronized ColumnStatisticsObj getCachedTableColStats(String 
colStatsCacheKey) {
+    return 
tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null;
+  }
+
+  public synchronized void removeTableColStatsFromCache(String dbName, String 
tblName) {
+    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        tableColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+        iterator.remove();
+      }
+    }
+  }
+
+  public synchronized void removeTableColStatsFromCache(String dbName, String 
tblName,
+      String colName) {
+    tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+  }
+
+  public synchronized void updateTableColStatsInCache(String dbName, String 
tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+      // Get old stats object if present
+      String key = CacheUtils.buildKey(dbName, tableName, 
colStatObj.getColName());
+      ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+      if (oldStatsObj != null) {
+        LOG.debug("CachedStore: updating table column stats for column: " + 
colStatObj.getColName()
+            + ", of table: " + tableName + " and database: " + dbName);
+        // Update existing stat object's field
+        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+      } else {
+        // No stats exist for this key; add a new object to the cache
+        tableColStatsCache.put(key, colStatObj);
+      }
+    }
+  }
+
+  public synchronized void alterTableInCache(String dbName, String tblName, 
Table newTable) {
+    removeTableFromCache(dbName, tblName);
+    addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
+        StringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+  }
+
+  public synchronized void alterTableInPartitionCache(String dbName, String 
tblName,
+      Table newTable) {
+    if (!dbName.equals(newTable.getDbName()) || 
!tblName.equals(newTable.getTableName())) {
+      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+      for (Partition part : partitions) {
+        removePartitionFromCache(part.getDbName(), part.getTableName(), 
part.getValues());
+        part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName()));
+        
part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName()));
+        
addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
+            StringUtils.normalizeIdentifier(newTable.getTableName()), part);
+      }
+    }
+  }
+
+  public synchronized void alterTableInTableColStatsCache(String dbName, 
String tblName,
+      Table newTable) {
+    if (!dbName.equals(newTable.getDbName()) || 
!tblName.equals(newTable.getTableName())) {
+      String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, 
tblName);
+      Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+          tableColStatsCache.entrySet().iterator();
+      Map<String, ColumnStatisticsObj> newTableColStats =
+          new HashMap<>();
+      while (iterator.hasNext()) {
+        Entry<String, ColumnStatisticsObj> entry = iterator.next();
+        String key = entry.getKey();
+        ColumnStatisticsObj colStatObj = entry.getValue();
+        if 
(key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
+          String[] decomposedKey = CacheUtils.splitTableColStats(key);
+          String newKey = CacheUtils.buildKey(decomposedKey[0], 
decomposedKey[1], decomposedKey[2]);
+          newTableColStats.put(newKey, colStatObj);
+          iterator.remove();
+        }
+      }
+      tableColStatsCache.putAll(newTableColStats);
+    }
+  }
+
+  public synchronized void alterTableInPartitionColStatsCache(String dbName, 
String tblName,
+      Table newTable) {
+    if (!dbName.equals(newTable.getDbName()) || 
!tblName.equals(newTable.getTableName())) {
+      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+      Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
+      for (Partition part : partitions) {
+        String oldPartialPartitionKey =
+            CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
+        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+            partitionColStatsCache.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Entry<String, ColumnStatisticsObj> entry = iterator.next();
+          String key = entry.getKey();
+          ColumnStatisticsObj colStatObj = entry.getValue();
+          if 
(key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+            Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+            String newKey =
+                CacheUtils.buildKey((String) decomposedKey[0], (String) 
decomposedKey[1],
+                    (List<String>) decomposedKey[2], (String) 
decomposedKey[3]);
+            newPartitionColStats.put(newKey, colStatObj);
+            iterator.remove();
+          }
+        }
+      }
+      partitionColStatsCache.putAll(newPartitionColStats);
+    }
+  }
+
+  public synchronized int getCachedTableCount() {
+    return tableCache.size();
+  }
+
+  public synchronized List<Table> listCachedTables(String dbName) {
+    List<Table> tables = new ArrayList<>();
+    for (TableWrapper wrapper : tableCache.values()) {
+      if (wrapper.getTable().getDbName().equals(dbName)) {
+        tables.add(CacheUtils.assemble(wrapper, this));
+      }
+    }
+    return tables;
+  }
+
+  public synchronized List<TableMeta> getTableMeta(String dbNames, String 
tableNames, List<String> tableTypes) {
+    List<TableMeta> tableMetas = new ArrayList<>();
+    for (String dbName : listCachedDatabases()) {
+      if (CacheUtils.matches(dbName, dbNames)) {
+        for (Table table : listCachedTables(dbName)) {
+          if (CacheUtils.matches(table.getTableName(), tableNames)) {
+            if (tableTypes==null || tableTypes.contains(table.getTableType())) 
{
+              TableMeta metaData = new TableMeta(
+                  dbName, table.getTableName(), table.getTableType());
+                metaData.setComments(table.getParameters().get("comment"));
+                tableMetas.add(metaData);
+            }
+          }
+        }
+      }
+    }
+    return tableMetas;
+  }
+
+  public synchronized void addPartitionToCache(String dbName, String tblName, 
Partition part) {
+    Partition partCopy = part.deepCopy();
+    PartitionWrapper wrapper;
+    if (part.getSd()!=null) {
+      byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
+      StorageDescriptor sd = part.getSd();
+      increSd(sd, sdHash);
+      partCopy.setSd(null);
+      wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), 
sd.getParameters());
+    } else {
+      wrapper = new PartitionWrapper(partCopy, null, null, null);
+    }
+    partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), 
wrapper);
+  }
+
+  public synchronized Partition getPartitionFromCache(String key) {
+    PartitionWrapper wrapper = partitionCache.get(key);
+    if (wrapper == null) {
+      return null;
+    }
+    Partition p = CacheUtils.assemble(wrapper, this);
+    return p;
+  }
+
+  public synchronized Partition getPartitionFromCache(String dbName, String 
tblName, List<String> part_vals) {
+    return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, 
part_vals));
+  }
+
+  public synchronized boolean existPartitionFromCache(String dbName, String 
tblName, List<String> part_vals) {
+    return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, 
part_vals));
+  }
+
+  public synchronized Partition removePartitionFromCache(String dbName, String 
tblName,
+      List<String> part_vals) {
+    PartitionWrapper wrapper =
+        partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
+    if (wrapper.getSdHash() != null) {
+      decrSd(wrapper.getSdHash());
+    }
+    return wrapper.getPartition();
+  }
+
+  // Remove cached column stats for all partitions of a table
+  public synchronized void removePartitionColStatsFromCache(String dbName, 
String tblName) {
+    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        partitionColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+        iterator.remove();
+      }
+    }
+  }
+
+  // Remove cached column stats for a particular partition of a table
+  public synchronized void removePartitionColStatsFromCache(String dbName, 
String tblName,
+      List<String> partVals) {
+    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, 
partVals);
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        partitionColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+        iterator.remove();
+      }
+    }
+  }
+
+  // Remove cached column stats for a particular partition and a particular 
column of a table
+  public synchronized void removePartitionColStatsFromCache(String dbName, 
String tblName,
+      List<String> partVals, String colName) {
+    partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, 
partVals, colName));
+  }
+
+  public synchronized List<Partition> listCachedPartitions(String dbName, 
String tblName, int max) {
+    List<Partition> partitions = new ArrayList<>();
+    int count = 0;
+    for (PartitionWrapper wrapper : partitionCache.values()) {
+      if (wrapper.getPartition().getDbName().equals(dbName)
+          && wrapper.getPartition().getTableName().equals(tblName)
+          && (max == -1 || count < max)) {
+        partitions.add(CacheUtils.assemble(wrapper, this));
+        count++;
+      }
+    }
+    return partitions;
+  }
+
+  public synchronized void alterPartitionInCache(String dbName, String tblName,
+      List<String> partVals, Partition newPart) {
+    removePartitionFromCache(dbName, tblName, partVals);
+    addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()),
+        StringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+  }
+
+  public synchronized void alterPartitionInColStatsCache(String dbName, String 
tblName,
+      List<String> partVals, Partition newPart) {
+    String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, 
tblName, partVals);
+    Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        partitionColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      ColumnStatisticsObj colStatObj = entry.getValue();
+      if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+        Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+        String newKey =
+            
CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()),
+                StringUtils.normalizeIdentifier(newPart.getTableName()), 
newPart.getValues(),
+                (String) decomposedKey[3]);
+        newPartitionColStats.put(newKey, colStatObj);
+        iterator.remove();
+      }
+    }
+    partitionColStatsCache.putAll(newPartitionColStats);
+  }
+
+  public synchronized void updatePartitionColStatsInCache(String dbName, 
String tableName,
+      List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
+    for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+      // Get old stats object if present
+      String key = CacheUtils.buildKey(dbName, tableName, partVals, 
colStatObj.getColName());
+      ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+      if (oldStatsObj != null) {
+        // Update existing stat object's field
+        LOG.debug("CachedStore: updating partition column stats for column: "
+            + colStatObj.getColName() + ", of table: " + tableName + " and 
database: " + dbName);
+        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+      } else {
+        // No stats exist for this key; add a new object to the cache
+        partitionColStatsCache.put(key, colStatObj);
+      }
+    }
+  }
+
+  public synchronized int getCachedPartitionCount() {
+    return partitionCache.size();
+  }
+
+  public synchronized ColumnStatisticsObj getCachedPartitionColStats(String 
key) {
+    return 
partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null;
+  }
+
+  public synchronized void addPartitionColStatsToCache(String dbName, String 
tableName,
+      Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) {
+    for (Map.Entry<String, List<ColumnStatisticsObj>> entry : 
colStatsPerPartition.entrySet()) {
+      String partName = entry.getKey();
+      try {
+        List<String> partVals = Warehouse.getPartValuesFromPartName(partName);
+        for (ColumnStatisticsObj colStatObj : entry.getValue()) {
+          String key = CacheUtils.buildKey(dbName, tableName, partVals, 
colStatObj.getColName());
+          partitionColStatsCache.put(key, colStatObj);
+        }
+      } catch (MetaException e) {
+        LOG.info("Unable to add partition: " + partName + " to SharedCache", 
e);
+      }
+    }
+  }
+
+  public synchronized void refreshPartitionColStats(String dbName, String 
tableName,
+      Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) {
+    LOG.debug("CachedStore: updating cached partition column stats objects for 
database: " + dbName
+        + " and table: " + tableName);
+    removePartitionColStatsFromCache(dbName, tableName);
+    addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition);
+  }
+
+  public synchronized void addTableColStatsToCache(String dbName, String 
tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+      String key = CacheUtils.buildKey(dbName, tableName, 
colStatObj.getColName());
+      tableColStatsCache.put(key, colStatObj);
+    }
+  }
+
+  public synchronized void refreshTableColStats(String dbName, String 
tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    LOG.debug("CachedStore: updating cached table column stats objects for 
database: " + dbName
+        + " and table: " + tableName);
+    // Remove all old cache entries for this table
+    removeTableColStatsFromCache(dbName, tableName);
+    // Add new entries to cache
+    addTableColStatsToCache(dbName, tableName, colStatsForTable);
+  }
+
+  public void increSd(StorageDescriptor sd, byte[] sdHash) {
+    ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
+    if (sdCache.containsKey(byteArray)) {
+      sdCache.get(byteArray).refCount++;
+    } else {
+      StorageDescriptor sdToCache = sd.deepCopy();
+      sdToCache.setLocation(null);
+      sdToCache.setParameters(null);
+      sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1));
+    }
+  }
+
+  public void decrSd(byte[] sdHash) {
+    ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
+    StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
+    sdWrapper.refCount--;
+    if (sdWrapper.getRefCount() == 0) {
+      sdCache.remove(byteArray);
+    }
+  }
+
+  public StorageDescriptor getSdFromCache(byte[] sdHash) {
+    StorageDescriptorWrapper sdWrapper = sdCache.get(new 
ByteArrayWrapper(sdHash));
+    return sdWrapper.getSd();
+  }
+
+  // Replace databases in databaseCache with the new list
+  public synchronized void refreshDatabases(List<Database> databases) {
+    LOG.debug("CachedStore: updating cached database objects");
+    for (String dbName : listCachedDatabases()) {
+      removeDatabaseFromCache(dbName);
+    }
+    for (Database db : databases) {
+      addDatabaseToCache(db.getName(), db);
+    }
+  }
+
+  // Replace tables in tableCache with the new list
+  public synchronized void refreshTables(String dbName, List<Table> tables) {
+    LOG.debug("CachedStore: updating cached table objects for database: " + 
dbName);
+    for (Table tbl : listCachedTables(dbName)) {
+      removeTableFromCache(dbName, tbl.getTableName());
+    }
+    for (Table tbl : tables) {
+      addTableToCache(dbName, tbl.getTableName(), tbl);
+    }
+  }
+
+  public synchronized void refreshPartitions(String dbName, String tblName,
+      List<Partition> partitions) {
+    LOG.debug("CachedStore: updating cached partition objects for database: " 
+ dbName
+        + " and table: " + tblName);
+    Iterator<Entry<String, PartitionWrapper>> iterator = 
partitionCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      PartitionWrapper partitionWrapper = iterator.next().getValue();
+      if (partitionWrapper.getPartition().getDbName().equals(dbName)
+          && partitionWrapper.getPartition().getTableName().equals(tblName)) {
+        iterator.remove();
+      }
+    }
+    for (Partition part : partitions) {
+      addPartitionToCache(dbName, tblName, part);
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, Database> getDatabaseCache() {
+    return databaseCache;
+  }
+
+  @VisibleForTesting
+  Map<String, TableWrapper> getTableCache() {
+    return tableCache;
+  }
+
+  @VisibleForTesting
+  Map<String, PartitionWrapper> getPartitionCache() {
+    return partitionCache;
+  }
+
+  @VisibleForTesting
+  Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
+    return sdCache;
+  }
+
+  @VisibleForTesting
+  Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
+    return partitionColStatsCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
new file mode 100644
index 0000000..45d5d8c
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+public class BinaryColumnStatsAggregator extends ColumnStatsAggregator {
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+    BinaryColumnStatsData aggregateData = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats();
+      if (aggregateData == null) {
+        aggregateData = newData.deepCopy();
+      } else {
+        aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), 
newData.getMaxColLen()));
+        aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), 
newData.getAvgColLen()));
+        aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    columnStatisticsData.setBinaryStats(aggregateData);
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
new file mode 100644
index 0000000..8aac0fe
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+public class BooleanColumnStatsAggregator extends ColumnStatsAggregator {
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+    BooleanColumnStatsData aggregateData = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats();
+      if (aggregateData == null) {
+        aggregateData = newData.deepCopy();
+      } else {
+        aggregateData.setNumTrues(aggregateData.getNumTrues() + 
newData.getNumTrues());
+        aggregateData.setNumFalses(aggregateData.getNumFalses() + 
newData.getNumFalses());
+        aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    columnStatisticsData.setBooleanStats(aggregateData);
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
new file mode 100644
index 0000000..cd0392d
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+public abstract class ColumnStatsAggregator {
+  public boolean useDensityFunctionForNDVEstimation;
+  public double ndvTuner;
+  public abstract ColumnStatisticsObj aggregate(String colName, List<String> 
partNames,
+      List<ColumnStatistics> css) throws MetaException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
new file mode 100644
index 0000000..7aaab4a
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
+
+public class ColumnStatsAggregatorFactory {
+
+  private ColumnStatsAggregatorFactory() {
+  }
+
+  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type,
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner) {
+    ColumnStatsAggregator agg;
+    switch (type) {
+    case BOOLEAN_STATS:
+      agg = new BooleanColumnStatsAggregator();
+      break;
+    case LONG_STATS:
+      agg = new LongColumnStatsAggregator();
+      break;
+    case DATE_STATS:
+      agg = new DateColumnStatsAggregator();
+      break;
+    case DOUBLE_STATS:
+      agg = new DoubleColumnStatsAggregator();
+      break;
+    case STRING_STATS:
+      agg = new StringColumnStatsAggregator();
+      break;
+    case BINARY_STATS:
+      agg = new BinaryColumnStatsAggregator();
+      break;
+    case DECIMAL_STATS:
+      agg = new DecimalColumnStatsAggregator();
+      break;
+    default:
+      throw new RuntimeException("Woh, bad.  Unknown stats type " + 
type.toString());
+    }
+    agg.useDensityFunctionForNDVEstimation = 
useDensityFunctionForNDVEstimation;
+    agg.ndvTuner = ndvTuner;
+    return agg;
+  }
+
+  public static ColumnStatisticsObj newColumnStaticsObj(String colName, String 
colType, _Fields type) {
+    ColumnStatisticsObj cso = new ColumnStatisticsObj();
+    ColumnStatisticsData csd = new ColumnStatisticsData();
+    cso.setColName(colName);
+    cso.setColType(colType);
+    switch (type) {
+    case BOOLEAN_STATS:
+      csd.setBooleanStats(new BooleanColumnStatsData());
+      break;
+
+    case LONG_STATS:
+      csd.setLongStats(new LongColumnStatsDataInspector());
+      break;
+
+    case DATE_STATS:
+      csd.setDateStats(new DateColumnStatsDataInspector());
+      break;
+
+    case DOUBLE_STATS:
+      csd.setDoubleStats(new DoubleColumnStatsDataInspector());
+      break;
+
+    case STRING_STATS:
+      csd.setStringStats(new StringColumnStatsDataInspector());
+      break;
+
+    case BINARY_STATS:
+      csd.setBinaryStats(new BinaryColumnStatsData());
+      break;
+
+    case DECIMAL_STATS:
+      csd.setDecimalStats(new DecimalColumnStatsDataInspector());
+      break;
+
+    default:
+      throw new RuntimeException("Woh, bad.  Unknown stats type!");
+    }
+
+    cso.setStatsData(csd);
+    return cso;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
new file mode 100644
index 0000000..7f29561
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DateColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DateColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      DateColumnStatsDataInspector dateColumnStats =
+          (DateColumnStatsDataInspector) cso.getStatsData().getDateStats();
+      if (dateColumnStats.getNdvEstimator() == null) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = 
dateColumnStats.getNdvEstimator();
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      DateColumnStatsDataInspector aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        DateColumnStatsDataInspector newData =
+            (DateColumnStatsDataInspector) cso.getStatsData().getDateStats();
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue()))
+            / newData.getNumDVs();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData.setLowValue(min(aggregateData.getLowValue(), 
newData.getLowValue()));
+          aggregateData
+              .setHighValue(max(aggregateData.getHighValue(), 
newData.getHighValue()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        long estimation;
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          estimation = (long) (diff(aggregateData.getHighValue(), 
aggregateData.getLowValue()) / densityAvg);
+          if (estimation < lowerBound) {
+            estimation = lowerBound;
+          } else if (estimation > higherBound) {
+            estimation = higherBound;
+          }
+        } else {
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * 
ndvTuner);
+        }
+        aggregateData.setNumDVs(estimation);
+      }
+      columnStatisticsData.setDateStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+
+      Map<String, Integer> indexMap = new HashMap<>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DateColumnStatsData newData = cso.getStatsData().getDateStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += diff(newData.getHighValue(), 
newData.getLowValue()) / newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        DateColumnStatsDataInspector aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DateColumnStatsDataInspector newData =
+              (DateColumnStatsDataInspector) cso.getStatsData().getDateStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setDateStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
+                    / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = 
NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setLowValue(min(aggregateData.getLowValue(), 
newData.getLowValue()));
+            aggregateData.setHighValue(max(aggregateData.getHighValue(), 
newData.getHighValue()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setDateStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
+                / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # 
of partitions found: {}", colName,
+        columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), 
css.size());
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  private long diff(Date d1, Date d2) {
+    return d1.getDaysSinceEpoch() - d2.getDaysSinceEpoch();
+  }
+
+  private Date min(Date d1, Date d2) {
+    return d1.compareTo(d2) < 0 ? d1 : d2;
+  }
+
+  private Date max(Date d1, Date d2) {
+    return d1.compareTo(d2) < 0 ? d2 : d1;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    DateColumnStatsDataInspector extrapolateDateData = new 
DateColumnStatsDataInspector();
+    Map<String, DateColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getDateStats());
+    }
+    List<Map.Entry<String, DateColumnStatsData>> list = new LinkedList<>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DateColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DateColumnStatsData> o1,
+          Map.Entry<String, DateColumnStatsData> o2) {
+        return 
o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long lowValue = 0;
+    long min = list.get(0).getValue().getLowValue().getDaysSinceEpoch();
+    long max = list.get(list.size() - 
1).getValue().getLowValue().getDaysSinceEpoch();
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / 
(minInd - maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DateColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DateColumnStatsData> o1,
+          Map.Entry<String, DateColumnStatsData> o2) {
+        return 
o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long highValue = 0;
+    min = list.get(0).getValue().getHighValue().getDaysSinceEpoch();
+    max = list.get(list.size() - 
1).getValue().getHighValue().getDaysSinceEpoch();
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / 
(maxInd - minInd));
+    } else {
+      // left border is the max
+      highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, DateColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DateColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DateColumnStatsData> o1,
+          Map.Entry<String, DateColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, DateColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
+    } else {
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      min = list.get(0).getValue().getNumDVs();
+      max = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = min;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd 
- minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+      }
+    }
+    extrapolateDateData.setLowValue(new Date(lowValue));
+    extrapolateDateData.setHighValue(new Date(highValue));
+    extrapolateDateData.setNumNulls(numNulls);
+    extrapolateDateData.setNumDVs(ndv);
+    extrapolateData.setDateStats(extrapolateDateData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
new file mode 100644
index 0000000..05c0280
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DecimalColumnStatsAggregator extends ColumnStatsAggregator 
implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DecimalColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      DecimalColumnStatsDataInspector decimalColumnStatsData =
+          (DecimalColumnStatsDataInspector) 
cso.getStatsData().getDecimalStats();
+      if (decimalColumnStatsData.getNdvEstimator() == null) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = 
decimalColumnStatsData.getNdvEstimator();
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      DecimalColumnStatsDataInspector aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        DecimalColumnStatsDataInspector newData =
+            (DecimalColumnStatsDataInspector) 
cso.getStatsData().getDecimalStats();
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += 
(MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils
+            .decimalToDouble(newData.getLowValue())) / newData.getNumDVs();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < 
MetaStoreUtils
+              .decimalToDouble(newData.getLowValue())) {
+            aggregateData.setLowValue(aggregateData.getLowValue());
+          } else {
+            aggregateData.setLowValue(newData.getLowValue());
+          }
+          if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > 
MetaStoreUtils
+              .decimalToDouble(newData.getHighValue())) {
+            aggregateData.setHighValue(aggregateData.getHighValue());
+          } else {
+            aggregateData.setHighValue(newData.getHighValue());
+          }
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        long estimation;
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          estimation = (long) 
((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils
+              .decimalToDouble(aggregateData.getLowValue())) / densityAvg);
+          if (estimation < lowerBound) {
+            estimation = lowerBound;
+          } else if (estimation > higherBound) {
+            estimation = higherBound;
+          }
+        } else {
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * 
ndvTuner);
+        }
+        aggregateData.setNumDVs(estimation);
+      }
+      columnStatisticsData.setDecimalStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+      Map<String, Integer> indexMap = new HashMap<>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DecimalColumnStatsData newData = 
cso.getStatsData().getDecimalStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += 
(MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils
+                .decimalToDouble(newData.getLowValue())) / newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        DecimalColumnStatsDataInspector aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DecimalColumnStatsDataInspector newData =
+              (DecimalColumnStatsDataInspector) 
cso.getStatsData().getDecimalStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setDecimalStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += 
(MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils
+                    .decimalToDouble(aggregateData.getLowValue())) / 
aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = 
NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < 
MetaStoreUtils
+                .decimalToDouble(newData.getLowValue())) {
+              aggregateData.setLowValue(aggregateData.getLowValue());
+            } else {
+              aggregateData.setLowValue(newData.getLowValue());
+            }
+            if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > 
MetaStoreUtils
+                .decimalToDouble(newData.getHighValue())) {
+              aggregateData.setHighValue(aggregateData.getHighValue());
+            } else {
+              aggregateData.setHighValue(newData.getHighValue());
+            }
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setDecimalStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += 
(MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils
+                .decimalToDouble(aggregateData.getLowValue())) / 
aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # 
of partitions found: {}", colName,
+        columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), 
css.size());
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    DecimalColumnStatsDataInspector extrapolateDecimalData = new 
DecimalColumnStatsDataInspector();
+    Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getDecimalStats());
+    }
+    List<Map.Entry<String, DecimalColumnStatsData>> list = new LinkedList<>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DecimalColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+          Map.Entry<String, DecimalColumnStatsData> o2) {
+        return 
o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double lowValue = 0;
+    double min = 
MetaStoreUtils.decimalToDouble(list.get(0).getValue().getLowValue());
+    double max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 
1).getValue().getLowValue());
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - 
maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DecimalColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+          Map.Entry<String, DecimalColumnStatsData> o2) {
+        return 
o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double highValue = 0;
+    min = 
MetaStoreUtils.decimalToDouble(list.get(0).getValue().getHighValue());
+    max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 
1).getValue().getHighValue());
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - 
minInd));
+    } else {
+      // left border is the max
+      highValue = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, DecimalColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    long ndvMin = 0;
+    long ndvMax = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DecimalColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+          Map.Entry<String, DecimalColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, DecimalColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
+    } else {
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      ndvMin = list.get(0).getValue().getNumDVs();
+      ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = ndvMin;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / 
(maxInd - minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
+      }
+    }
+    
extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String
+        .valueOf(lowValue)));
+    
extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String
+        .valueOf(highValue)));
+    extrapolateDecimalData.setNumNulls(numNulls);
+    extrapolateDecimalData.setNumDVs(ndv);
+    extrapolateData.setDecimalStats(extrapolateDecimalData);
+  }
+}

Reply via email to