http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
new file mode 100644
index 0000000..faf22dc
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DoubleColumnStatsAggregator extends ColumnStatsAggregator 
implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      DoubleColumnStatsDataInspector doubleColumnStatsData =
+          (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats();
+      if (doubleColumnStatsData.getNdvEstimator() == null) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = 
doubleColumnStatsData.getNdvEstimator();
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      DoubleColumnStatsDataInspector aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        DoubleColumnStatsDataInspector newData =
+            (DoubleColumnStatsDataInspector) 
cso.getStatsData().getDoubleStats();
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / 
newData.getNumDVs();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), 
newData.getLowValue()));
+          aggregateData
+              .setHighValue(Math.max(aggregateData.getHighValue(), 
newData.getHighValue()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        long estimation;
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          estimation = (long) ((aggregateData.getHighValue() - 
aggregateData.getLowValue()) / densityAvg);
+          if (estimation < lowerBound) {
+            estimation = lowerBound;
+          } else if (estimation > higherBound) {
+            estimation = higherBound;
+          }
+        } else {
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * 
ndvTuner);
+        }
+        aggregateData.setNumDVs(estimation);
+      }
+      columnStatisticsData.setDoubleStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+      Map<String, Integer> indexMap = new HashMap<>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (newData.getHighValue() - newData.getLowValue()) 
/ newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        DoubleColumnStatsData aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DoubleColumnStatsDataInspector newData =
+              (DoubleColumnStatsDataInspector) 
cso.getStatsData().getDoubleStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setDoubleStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += (aggregateData.getHighValue() - 
aggregateData.getLowValue()) / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = 
NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), 
newData.getLowValue()));
+            aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
+                newData.getHighValue()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setDoubleStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (aggregateData.getHighValue() - 
aggregateData.getLowValue()) / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    LOG.debug("Ndv estimatation for {} is {}. # of partitions requested: {}. # 
of partitions found: {}", colName,
+        columnStatisticsData.getDoubleStats().getNumDVs(),partNames.size(), 
css.size());
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    DoubleColumnStatsDataInspector extrapolateDoubleData = new 
DoubleColumnStatsDataInspector();
+    Map<String, DoubleColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getDoubleStats());
+    }
+    List<Map.Entry<String, DoubleColumnStatsData>> list = new LinkedList<>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DoubleColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+          Map.Entry<String, DoubleColumnStatsData> o2) {
+        return Double.compare(o1.getValue().getLowValue(), 
o2.getValue().getLowValue());
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double lowValue = 0;
+    double min = list.get(0).getValue().getLowValue();
+    double max = list.get(list.size() - 1).getValue().getLowValue();
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - 
maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DoubleColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+          Map.Entry<String, DoubleColumnStatsData> o2) {
+        return Double.compare(o1.getValue().getHighValue(), 
o2.getValue().getHighValue());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double highValue = 0;
+    min = list.get(0).getValue().getHighValue();
+    max = list.get(list.size() - 1).getValue().getHighValue();
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - 
minInd));
+    } else {
+      // left border is the max
+      highValue = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, DoubleColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    long ndvMin = 0;
+    long ndvMax = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, 
DoubleColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+          Map.Entry<String, DoubleColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, DoubleColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
+    } else {
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      ndvMin = list.get(0).getValue().getNumDVs();
+      ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = ndvMin;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / 
(maxInd - minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
+      }
+    }
+    extrapolateDoubleData.setLowValue(lowValue);
+    extrapolateDoubleData.setHighValue(highValue);
+    extrapolateDoubleData.setNumNulls(numNulls);
+    extrapolateDoubleData.setNumDVs(ndv);
+    extrapolateData.setDoubleStats(extrapolateDoubleData);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/IExtrapolatePartStatus.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/IExtrapolatePartStatus.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/IExtrapolatePartStatus.java
new file mode 100644
index 0000000..98a121b
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/IExtrapolatePartStatus.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+
+public interface IExtrapolatePartStatus {
+  // The following function will extrapolate the stats when the column stats of
+  // some partitions are missing.
+  /**
+   * @param extrapolateData
+   *          it will carry back the specific stats, e.g., DOUBLE_STATS or
+   *          LONG_STATS
+   * @param numParts
+   *          the total number of partitions
+   * @param numPartsWithStats
+   *          the number of partitions that have stats
+   * @param adjustedIndexMap
+   *          the partition name to index map
+   * @param adjustedStatsMap
+   *          the partition name to its stats map
+   * @param densityAvg
+   *          the average of ndv density, which is useful when
+   *          useDensityFunctionForNDVEstimation is true.
+   */
+  void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
new file mode 100644
index 0000000..d12cdc0
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      LongColumnStatsDataInspector longColumnStatsData =
+          (LongColumnStatsDataInspector) cso.getStatsData().getLongStats();
+      if (longColumnStatsData.getNdvEstimator() == null) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = 
longColumnStatsData.getNdvEstimator();
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      LongColumnStatsDataInspector aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        LongColumnStatsDataInspector newData =
+            (LongColumnStatsDataInspector) cso.getStatsData().getLongStats();
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / 
newData.getNumDVs();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), 
newData.getLowValue()));
+          aggregateData
+              .setHighValue(Math.max(aggregateData.getHighValue(), 
newData.getHighValue()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        long estimation;
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          estimation = (long) ((aggregateData.getHighValue() - 
aggregateData.getLowValue()) / densityAvg);
+          if (estimation < lowerBound) {
+            estimation = lowerBound;
+          } else if (estimation > higherBound) {
+            estimation = higherBound;
+          }
+        } else {
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * 
ndvTuner);
+        }
+        aggregateData.setNumDVs(estimation);
+      }
+      columnStatisticsData.setLongStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+
+      Map<String, Integer> indexMap = new HashMap<>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          LongColumnStatsData newData = cso.getStatsData().getLongStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (newData.getHighValue() - newData.getLowValue()) 
/ newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        LongColumnStatsDataInspector aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          LongColumnStatsDataInspector newData =
+              (LongColumnStatsDataInspector) cso.getStatsData().getLongStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setLongStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += (aggregateData.getHighValue() - 
aggregateData.getLowValue()) / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = 
NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), 
newData.getLowValue()));
+            aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
+                newData.getHighValue()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setLongStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (aggregateData.getHighValue() - 
aggregateData.getLowValue()) / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # 
of partitions found: {}", colName,
+        columnStatisticsData.getLongStats().getNumDVs(),partNames.size(), 
css.size());
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    LongColumnStatsDataInspector extrapolateLongData = new 
LongColumnStatsDataInspector();
+    Map<String, LongColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getLongStats());
+    }
+    List<Map.Entry<String, LongColumnStatsData>> list = new LinkedList<>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
LongColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, LongColumnStatsData> o1,
+          Map.Entry<String, LongColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getLowValue(), 
o2.getValue().getLowValue());
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long lowValue = 0;
+    long min = list.get(0).getValue().getLowValue();
+    long max = list.get(list.size() - 1).getValue().getLowValue();
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / 
(minInd - maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, 
LongColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, LongColumnStatsData> o1,
+          Map.Entry<String, LongColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getHighValue(), 
o2.getValue().getHighValue());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long highValue = 0;
+    min = list.get(0).getValue().getHighValue();
+    max = list.get(list.size() - 1).getValue().getHighValue();
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / 
(maxInd - minInd));
+    } else {
+      // left border is the max
+      highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, LongColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, 
LongColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, LongColumnStatsData> o1,
+          Map.Entry<String, LongColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, LongColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
+    } else {
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      min = list.get(0).getValue().getNumDVs();
+      max = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = min;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd 
- minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+      }
+    }
+    extrapolateLongData.setLowValue(lowValue);
+    extrapolateLongData.setHighValue(highValue);
+    extrapolateLongData.setNumNulls(numNulls);
+    extrapolateLongData.setNumDVs(ndv);
+    extrapolateData.setLongStats(extrapolateLongData);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
new file mode 100644
index 0000000..4539e6b
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StringColumnStatsAggregator extends ColumnStatsAggregator 
implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors. Only when both of the conditions are true, we merge bit
+    // vectors. Otherwise, just use the maximum function.
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but 
found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
+            .getStatsData().getSetField());
+      }
+      StringColumnStatsDataInspector stringColumnStatsData =
+          (StringColumnStatsDataInspector) cso.getStatsData().getStringStats();
+      if (stringColumnStatsData.getNdvEstimator() == null) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = 
stringColumnStatsData.getNdvEstimator();
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      StringColumnStatsDataInspector aggregateData = null;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        StringColumnStatsDataInspector newData =
+            (StringColumnStatsDataInspector) 
cso.getStatsData().getStringStats();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData
+              .setMaxColLen(Math.max(aggregateData.getMaxColLen(), 
newData.getMaxColLen()));
+          aggregateData
+              .setAvgColLen(Math.max(aggregateData.getAvgColLen(), 
newData.getAvgColLen()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        // aggregateData already has the ndv of the max of all
+      }
+      columnStatisticsData.setStringStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+
+      Map<String, Integer> indexMap = new HashMap<>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>();
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        StringColumnStatsDataInspector aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          StringColumnStatsDataInspector newData =
+              (StringColumnStatsDataInspector) 
cso.getStatsData().getStringStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setStringStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = NumDistinctValueEstimatorFactory
+                  .getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setAvgColLen(Math.min(aggregateData.getAvgColLen(),
+                newData.getAvgColLen()));
+            aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(),
+                newData.getMaxColLen()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setStringStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
+          adjustedStatsMap, -1);
+    }
+    LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # 
of partitions found: {}", colName,
+        columnStatisticsData.getStringStats().getNumDVs(),partNames.size(), 
css.size());
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    StringColumnStatsDataInspector extrapolateStringData = new 
StringColumnStatsDataInspector();
+    Map<String, StringColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getStringStats());
+    }
+    List<Map.Entry<String, StringColumnStatsData>> list = new LinkedList<>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the avgLen
+    Collections.sort(list, new Comparator<Map.Entry<String, 
StringColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, StringColumnStatsData> o1,
+          Map.Entry<String, StringColumnStatsData> o2) {
+        return Double.compare(o1.getValue().getAvgColLen(), 
o2.getValue().getAvgColLen());
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double avgColLen = 0;
+    double min = list.get(0).getValue().getAvgColLen();
+    double max = list.get(list.size() - 1).getValue().getAvgColLen();
+    if (minInd == maxInd) {
+      avgColLen = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      avgColLen = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - 
minInd));
+    } else {
+      // left border is the max
+      avgColLen = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the maxLen
+    Collections.sort(list, new Comparator<Map.Entry<String, 
StringColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, StringColumnStatsData> o1,
+          Map.Entry<String, StringColumnStatsData> o2) {
+        return Long.compare(o1.getValue().getMaxColLen(), 
o2.getValue().getMaxColLen());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double maxColLen = 0;
+    min = list.get(0).getValue().getAvgColLen();
+    max = list.get(list.size() - 1).getValue().getAvgColLen();
+    if (minInd == maxInd) {
+      maxColLen = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      maxColLen = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - 
minInd));
+    } else {
+      // left border is the max
+      maxColLen = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, StringColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, 
StringColumnStatsData>>() {
+      @Override
+      public int compare(Map.Entry<String, StringColumnStatsData> o1,
+          Map.Entry<String, StringColumnStatsData> o2) {
+       return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    min = list.get(0).getValue().getNumDVs();
+    max = list.get(list.size() - 1).getValue().getNumDVs();
+    if (minInd == maxInd) {
+      ndv = (long) min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - 
minInd));
+    } else {
+      // left border is the max
+      ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+    }
+    extrapolateStringData.setAvgColLen(avgColLen);
+    extrapolateStringData.setMaxColLen((long) maxColLen);
+    extrapolateStringData.setNumNulls(numNulls);
+    extrapolateStringData.setNumDVs(ndv);
+    extrapolateData.setStringStats(extrapolateStringData);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DateColumnStatsDataInspector.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DateColumnStatsDataInspector.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DateColumnStatsDataInspector.java
new file mode 100644
index 0000000..937ebf2
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DateColumnStatsDataInspector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.cache;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+
+@SuppressWarnings("serial")
+public class DateColumnStatsDataInspector extends DateColumnStatsData {
+
+  private NumDistinctValueEstimator ndvEstimator;
+
+  public DateColumnStatsDataInspector() {
+    super();
+  }
+
+  public DateColumnStatsDataInspector(long numNulls, long numDVs) {
+    super(numNulls, numDVs);
+  }
+
+  public DateColumnStatsDataInspector(DateColumnStatsDataInspector other) {
+    super(other);
+    if (other.ndvEstimator != null) {
+      super.setBitVectors(ndvEstimator.serialize());
+    }
+  }
+
+  @Override
+  public DateColumnStatsDataInspector deepCopy() {
+    return new DateColumnStatsDataInspector(this);
+  }
+
+  @Override
+  public byte[] getBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.getBitVectors();
+  }
+
+  @Override
+  public ByteBuffer bufferForBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.bufferForBitVectors();
+  }
+
+  @Override
+  public void setBitVectors(byte[] bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void setBitVectors(ByteBuffer bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void unsetBitVectors() {
+    super.unsetBitVectors();
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public boolean isSetBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.isSetBitVectors();
+  }
+
+  @Override
+  public void setBitVectorsIsSet(boolean value) {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    super.setBitVectorsIsSet(value);
+  }
+
+  public NumDistinctValueEstimator getNdvEstimator() {
+    if (isSetBitVectors() && getBitVectors().length != 0) {
+      updateNdvEstimator();
+    }
+    return ndvEstimator;
+  }
+
+  public void setNdvEstimator(NumDistinctValueEstimator ndvEstimator) {
+    super.unsetBitVectors();
+    this.ndvEstimator = ndvEstimator;
+  }
+
+  private void updateBitVectors() {
+    super.setBitVectors(ndvEstimator.serialize());
+    this.ndvEstimator = null;
+  }
+
+  private void updateNdvEstimator() {
+    this.ndvEstimator = NumDistinctValueEstimatorFactory
+        .getNumDistinctValueEstimator(super.getBitVectors());
+    super.unsetBitVectors();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DecimalColumnStatsDataInspector.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DecimalColumnStatsDataInspector.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DecimalColumnStatsDataInspector.java
new file mode 100644
index 0000000..586b5d8
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DecimalColumnStatsDataInspector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.cache;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+
+@SuppressWarnings("serial")
+public class DecimalColumnStatsDataInspector extends DecimalColumnStatsData {
+
+  private NumDistinctValueEstimator ndvEstimator;
+
+  public DecimalColumnStatsDataInspector() {
+    super();
+  }
+
+  public DecimalColumnStatsDataInspector(long numNulls, long numDVs) {
+    super(numNulls, numDVs);
+  }
+
+  public DecimalColumnStatsDataInspector(DecimalColumnStatsDataInspector 
other) {
+    super(other);
+    if (other.ndvEstimator != null) {
+      super.setBitVectors(ndvEstimator.serialize());
+    }
+  }
+
+  @Override
+  public DecimalColumnStatsDataInspector deepCopy() {
+    return new DecimalColumnStatsDataInspector(this);
+  }
+
+  @Override
+  public byte[] getBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.getBitVectors();
+  }
+
+  @Override
+  public ByteBuffer bufferForBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.bufferForBitVectors();
+  }
+
+  @Override
+  public void setBitVectors(byte[] bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void setBitVectors(ByteBuffer bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void unsetBitVectors() {
+    super.unsetBitVectors();
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public boolean isSetBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.isSetBitVectors();
+  }
+
+  @Override
+  public void setBitVectorsIsSet(boolean value) {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    super.setBitVectorsIsSet(value);
+  }
+
+  public NumDistinctValueEstimator getNdvEstimator() {
+    if (isSetBitVectors() && getBitVectors().length != 0) {
+      updateNdvEstimator();
+    }
+    return ndvEstimator;
+  }
+
+  public void setNdvEstimator(NumDistinctValueEstimator ndvEstimator) {
+    super.unsetBitVectors();
+    this.ndvEstimator = ndvEstimator;
+  }
+
+  private void updateBitVectors() {
+    super.setBitVectors(ndvEstimator.serialize());
+    this.ndvEstimator = null;
+  }
+
+  private void updateNdvEstimator() {
+    this.ndvEstimator = NumDistinctValueEstimatorFactory
+        .getNumDistinctValueEstimator(super.getBitVectors());
+    super.unsetBitVectors();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DoubleColumnStatsDataInspector.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DoubleColumnStatsDataInspector.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DoubleColumnStatsDataInspector.java
new file mode 100644
index 0000000..3609ddd
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/DoubleColumnStatsDataInspector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.cache;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+
+@SuppressWarnings("serial")
+public class DoubleColumnStatsDataInspector extends DoubleColumnStatsData {
+
+  private NumDistinctValueEstimator ndvEstimator;
+
+  public DoubleColumnStatsDataInspector() {
+    super();
+  }
+
+  public DoubleColumnStatsDataInspector(long numNulls, long numDVs) {
+    super(numNulls, numDVs);
+  }
+
+  public DoubleColumnStatsDataInspector(DoubleColumnStatsDataInspector other) {
+    super(other);
+    if (other.ndvEstimator != null) {
+      super.setBitVectors(ndvEstimator.serialize());
+    }
+  }
+
+  @Override
+  public DoubleColumnStatsDataInspector deepCopy() {
+    return new DoubleColumnStatsDataInspector(this);
+  }
+
+  @Override
+  public byte[] getBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.getBitVectors();
+  }
+
+  @Override
+  public ByteBuffer bufferForBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.bufferForBitVectors();
+  }
+
+  @Override
+  public void setBitVectors(byte[] bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void setBitVectors(ByteBuffer bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void unsetBitVectors() {
+    super.unsetBitVectors();
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public boolean isSetBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.isSetBitVectors();
+  }
+
+  @Override
+  public void setBitVectorsIsSet(boolean value) {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    super.setBitVectorsIsSet(value);
+  }
+
+  public NumDistinctValueEstimator getNdvEstimator() {
+    if (isSetBitVectors() && getBitVectors().length != 0) {
+      updateNdvEstimator();
+    }
+    return ndvEstimator;
+  }
+
+  public void setNdvEstimator(NumDistinctValueEstimator ndvEstimator) {
+    super.unsetBitVectors();
+    this.ndvEstimator = ndvEstimator;
+  }
+
+  private void updateBitVectors() {
+    super.setBitVectors(ndvEstimator.serialize());
+    this.ndvEstimator = null;
+  }
+
+  private void updateNdvEstimator() {
+    this.ndvEstimator = NumDistinctValueEstimatorFactory
+        .getNumDistinctValueEstimator(super.getBitVectors());
+    super.unsetBitVectors();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/LongColumnStatsDataInspector.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/LongColumnStatsDataInspector.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/LongColumnStatsDataInspector.java
new file mode 100644
index 0000000..5632d91
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/LongColumnStatsDataInspector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.cache;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+
+@SuppressWarnings("serial")
+public class LongColumnStatsDataInspector extends LongColumnStatsData {
+
+  private NumDistinctValueEstimator ndvEstimator;
+
+  public LongColumnStatsDataInspector() {
+    super();
+  }
+
+  public LongColumnStatsDataInspector(long numNulls, long numDVs) {
+    super(numNulls, numDVs);
+  }
+
+  public LongColumnStatsDataInspector(LongColumnStatsDataInspector other) {
+    super(other);
+    if (other.ndvEstimator != null) {
+      super.setBitVectors(ndvEstimator.serialize());
+    }
+  }
+
+  @Override
+  public LongColumnStatsDataInspector deepCopy() {
+    return new LongColumnStatsDataInspector(this);
+  }
+
+  @Override
+  public byte[] getBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.getBitVectors();
+  }
+
+  @Override
+  public ByteBuffer bufferForBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.bufferForBitVectors();
+  }
+
+  @Override
+  public void setBitVectors(byte[] bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void setBitVectors(ByteBuffer bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void unsetBitVectors() {
+    super.unsetBitVectors();
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public boolean isSetBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.isSetBitVectors();
+  }
+
+  @Override
+  public void setBitVectorsIsSet(boolean value) {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    super.setBitVectorsIsSet(value);
+  }
+
+  public NumDistinctValueEstimator getNdvEstimator() {
+    if (isSetBitVectors() && getBitVectors().length != 0) {
+      updateNdvEstimator();
+    }
+    return ndvEstimator;
+  }
+
+  public void setNdvEstimator(NumDistinctValueEstimator ndvEstimator) {
+    super.unsetBitVectors();
+    this.ndvEstimator = ndvEstimator;
+  }
+
+  private void updateBitVectors() {
+    super.setBitVectors(ndvEstimator.serialize());
+    this.ndvEstimator = null;
+  }
+
+  private void updateNdvEstimator() {
+    this.ndvEstimator = NumDistinctValueEstimatorFactory
+        .getNumDistinctValueEstimator(super.getBitVectors());
+    super.unsetBitVectors();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/StringColumnStatsDataInspector.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/StringColumnStatsDataInspector.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/StringColumnStatsDataInspector.java
new file mode 100644
index 0000000..2db037b
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/cache/StringColumnStatsDataInspector.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.cache;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+
+@SuppressWarnings("serial")
+public class StringColumnStatsDataInspector extends StringColumnStatsData {
+
+  private NumDistinctValueEstimator ndvEstimator;
+
+  public StringColumnStatsDataInspector() {
+    super();
+  }
+
+  public StringColumnStatsDataInspector(long maxColLen, double avgColLen,
+      long numNulls, long numDVs) {
+    super(maxColLen, avgColLen, numNulls, numDVs);
+  }
+
+  public StringColumnStatsDataInspector(StringColumnStatsDataInspector other) {
+    super(other);
+    if (other.ndvEstimator != null) {
+      super.setBitVectors(ndvEstimator.serialize());
+    }
+  }
+
+  @Override
+  public StringColumnStatsDataInspector deepCopy() {
+    return new StringColumnStatsDataInspector(this);
+  }
+
+  @Override
+  public byte[] getBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.getBitVectors();
+  }
+
+  @Override
+  public ByteBuffer bufferForBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.bufferForBitVectors();
+  }
+
+  @Override
+  public void setBitVectors(byte[] bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void setBitVectors(ByteBuffer bitVectors) {
+    super.setBitVectors(bitVectors);
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public void unsetBitVectors() {
+    super.unsetBitVectors();
+    this.ndvEstimator = null;
+  }
+
+  @Override
+  public boolean isSetBitVectors() {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    return super.isSetBitVectors();
+  }
+
+  @Override
+  public void setBitVectorsIsSet(boolean value) {
+    if (ndvEstimator != null) {
+      updateBitVectors();
+    }
+    super.setBitVectorsIsSet(value);
+  }
+
+  public NumDistinctValueEstimator getNdvEstimator() {
+    if (isSetBitVectors() && getBitVectors().length != 0) {
+      updateNdvEstimator();
+    }
+    return ndvEstimator;
+  }
+
+  public void setNdvEstimator(NumDistinctValueEstimator ndvEstimator) {
+    super.unsetBitVectors();
+    this.ndvEstimator = ndvEstimator;
+  }
+
+  private void updateBitVectors() {
+    super.setBitVectors(ndvEstimator.serialize());
+    this.ndvEstimator = null;
+  }
+
+  private void updateNdvEstimator() {
+    this.ndvEstimator = NumDistinctValueEstimatorFactory
+        .getNumDistinctValueEstimator(super.getBitVectors());
+    super.unsetBitVectors();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
new file mode 100644
index 0000000..1c2402f
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+
+public class BinaryColumnStatsMerger extends ColumnStatsMerger {
+
+  @Override
+  public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj 
newColStats) {
+    BinaryColumnStatsData aggregateData = 
aggregateColStats.getStatsData().getBinaryStats();
+    BinaryColumnStatsData newData = 
newColStats.getStatsData().getBinaryStats();
+    aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), 
newData.getMaxColLen()));
+    aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), 
newData.getAvgColLen()));
+    aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
new file mode 100644
index 0000000..fd6b87a
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+
+public class BooleanColumnStatsMerger extends ColumnStatsMerger {
+
+  @Override
+  public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj 
newColStats) {
+    BooleanColumnStatsData aggregateData = 
aggregateColStats.getStatsData().getBooleanStats();
+    BooleanColumnStatsData newData = 
newColStats.getStatsData().getBooleanStats();
+    aggregateData.setNumTrues(aggregateData.getNumTrues() + 
newData.getNumTrues());
+    aggregateData.setNumFalses(aggregateData.getNumFalses() + 
newData.getNumFalses());
+    aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
new file mode 100644
index 0000000..ce55756
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ColumnStatsMerger {
+  protected final Logger LOG = 
LoggerFactory.getLogger(ColumnStatsMerger.class.getName());
+
+  public abstract void merge(ColumnStatisticsObj aggregateColStats,
+      ColumnStatisticsObj newColStats);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
new file mode 100644
index 0000000..1a2d38e
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
+
+public class ColumnStatsMergerFactory {
+
+  private ColumnStatsMergerFactory() {
+  }
+
+  public static ColumnStatsMerger getColumnStatsMerger(ColumnStatisticsObj 
statsObjNew,
+      ColumnStatisticsObj statsObjOld) {
+    ColumnStatsMerger agg;
+    _Fields typeNew = statsObjNew.getStatsData().getSetField();
+    _Fields typeOld = statsObjOld.getStatsData().getSetField();
+    // make sure that they have the same type
+    typeNew = typeNew == typeOld ? typeNew : null;
+    switch (typeNew) {
+    case BOOLEAN_STATS:
+      agg = new BooleanColumnStatsMerger();
+      break;
+    case LONG_STATS: {
+      agg = new LongColumnStatsMerger();
+      break;
+    }
+    case DOUBLE_STATS: {
+      agg = new DoubleColumnStatsMerger();
+      break;
+    }
+    case STRING_STATS: {
+      agg = new StringColumnStatsMerger();
+      break;
+    }
+    case BINARY_STATS:
+      agg = new BinaryColumnStatsMerger();
+      break;
+    case DECIMAL_STATS: {
+      agg = new DecimalColumnStatsMerger();
+      break;
+    }
+    case DATE_STATS: {
+      agg = new DateColumnStatsMerger();
+      break;
+    }
+    default:
+      throw new IllegalArgumentException("Unknown stats type " + 
typeNew.toString());
+    }
+    return agg;
+  }
+
+  public static ColumnStatisticsObj newColumnStaticsObj(String colName, String 
colType, _Fields type) {
+    ColumnStatisticsObj cso = new ColumnStatisticsObj();
+    ColumnStatisticsData csd = new ColumnStatisticsData();
+    cso.setColName(colName);
+    cso.setColType(colType);
+    switch (type) {
+    case BOOLEAN_STATS:
+      csd.setBooleanStats(new BooleanColumnStatsData());
+      break;
+
+    case LONG_STATS:
+      csd.setLongStats(new LongColumnStatsDataInspector());
+      break;
+
+    case DOUBLE_STATS:
+      csd.setDoubleStats(new DoubleColumnStatsDataInspector());
+      break;
+
+    case STRING_STATS:
+      csd.setStringStats(new StringColumnStatsDataInspector());
+      break;
+
+    case BINARY_STATS:
+      csd.setBinaryStats(new BinaryColumnStatsData());
+      break;
+
+    case DECIMAL_STATS:
+      csd.setDecimalStats(new DecimalColumnStatsDataInspector());
+      break;
+
+    case DATE_STATS:
+      csd.setDateStats(new DateColumnStatsDataInspector());
+      break;
+
+    default:
+      throw new IllegalArgumentException("Unknown stats type");
+    }
+
+    cso.setStatsData(csd);
+    return cso;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
new file mode 100644
index 0000000..5baebbb
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+
+public class DateColumnStatsMerger extends ColumnStatsMerger {
+  @Override
+  public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj 
newColStats) {
+    DateColumnStatsDataInspector aggregateData =
+        (DateColumnStatsDataInspector) 
aggregateColStats.getStatsData().getDateStats();
+    DateColumnStatsDataInspector newData =
+        (DateColumnStatsDataInspector) 
newColStats.getStatsData().getDateStats();
+    Date lowValue = 
aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
+        .getLowValue() : newData.getLowValue();
+    aggregateData.setLowValue(lowValue);
+    Date highValue = 
aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? 
aggregateData
+        .getHighValue() : newData.getHighValue();
+    aggregateData.setHighValue(highValue);
+    aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
+    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() 
== null) {
+      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
+    } else {
+      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
+      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
+      long ndv = -1;
+      if (oldEst.canMerge(newEst)) {
+        oldEst.mergeEstimators(newEst);
+        ndv = oldEst.estimateNumDistinctValues();
+        aggregateData.setNdvEstimator(oldEst);
+      } else {
+        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+      }
+      LOG.debug("Use bitvector to merge column " + 
aggregateColStats.getColName() + "'s ndvs of "
+          + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to 
be " + ndv);
+      aggregateData.setNumDVs(ndv);
+    }
+  }
+}

Reply via email to