http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
new file mode 100644
index 0000000..c52746e
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.ndv.hll;
+
+import java.util.Arrays;
+
+public class HLLDenseRegister implements HLLRegister {
+
+  // 2^p number of bytes for register
+  private byte[] register;
+
+  // max value stored in registered is cached to determine the bit width for
+  // bit packing
+  private int maxRegisterValue;
+
+  // keep count of number of zeroes in registers
+  private int numZeroes;
+
+  // compute and cache inverse power of 2 for register values
+  private double[] invPow2Register;
+
+  // number of register bits
+  private int p;
+
+  // m = 2^p
+  private int m;
+
+  public HLLDenseRegister(int p) {
+    this(p, true);
+  }
+
+  public HLLDenseRegister(int p, boolean bitPack) {
+    this.p = p;
+    this.m = 1 << p;
+    this.register = new byte[m];
+    this.invPow2Register = new double[m];
+    Arrays.fill(invPow2Register, 1.0);
+    this.maxRegisterValue = 0;
+    this.numZeroes = m;
+    if (bitPack == false) {
+      this.maxRegisterValue = 0xff;
+    }
+  }
+
+  public boolean add(long hashcode) {
+
+    // LSB p bits
+    final int registerIdx = (int) (hashcode & (m - 1));
+
+    // MSB 64 - p bits
+    final long w = hashcode >>> p;
+
+    // longest run of trailing zeroes
+    final int lr = Long.numberOfTrailingZeros(w) + 1;
+    return set(registerIdx, (byte) lr);
+  }
+
+  public boolean set(int idx, byte value) {
+    boolean updated = false;
+    if (idx < register.length && value > register[idx]) {
+
+      // update max register value
+      if (value > maxRegisterValue) {
+        maxRegisterValue = value;
+      }
+
+      // update number of zeros
+      if (register[idx] == 0 && value > 0) {
+        numZeroes--;
+      }
+
+      // set register value and compute inverse pow of 2 for register value
+      register[idx] = value;
+      invPow2Register[idx] = Math.pow(2, -value);
+
+      updated = true;
+    }
+    return updated;
+  }
+
+  public int size() {
+    return register.length;
+  }
+
+  public int getNumZeroes() {
+    return numZeroes;
+  }
+
+  public void merge(HLLRegister hllRegister) {
+    if (hllRegister instanceof HLLDenseRegister) {
+      HLLDenseRegister hdr = (HLLDenseRegister) hllRegister;
+      byte[] inRegister = hdr.getRegister();
+
+      // merge only if the register length matches
+      if (register.length != inRegister.length) {
+        throw new IllegalArgumentException(
+            "The size of register sets of HyperLogLogs to be merged does not 
match.");
+      }
+
+      // compare register values and store the max register value
+      for (int i = 0; i < inRegister.length; i++) {
+        if (inRegister[i] > register[i]) {
+          if (register[i] == 0) {
+            numZeroes--;
+          }
+          register[i] = inRegister[i];
+          invPow2Register[i] = Math.pow(2, -inRegister[i]);
+        }
+      }
+
+      // update max register value
+      if (hdr.getMaxRegisterValue() > maxRegisterValue) {
+        maxRegisterValue = hdr.getMaxRegisterValue();
+      }
+    } else {
+      throw new IllegalArgumentException("Specified register is not instance 
of HLLDenseRegister");
+    }
+  }
+
+  public byte[] getRegister() {
+    return register;
+  }
+
+  public void setRegister(byte[] register) {
+    this.register = register;
+  }
+
+  public int getMaxRegisterValue() {
+    return maxRegisterValue;
+  }
+
+  public double getSumInversePow2() {
+    double sum = 0;
+    for (double d : invPow2Register) {
+      sum += d;
+    }
+    return sum;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("HLLDenseRegister - ");
+    sb.append("p: ");
+    sb.append(p);
+    sb.append(" numZeroes: ");
+    sb.append(numZeroes);
+    sb.append(" maxRegisterValue: ");
+    sb.append(maxRegisterValue);
+    return sb.toString();
+  }
+
+  public String toExtendedString() {
+    return toString() + " register: " + Arrays.toString(register);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof HLLDenseRegister)) {
+      return false;
+    }
+    HLLDenseRegister other = (HLLDenseRegister) obj;
+    return numZeroes == other.numZeroes && maxRegisterValue == 
other.maxRegisterValue
+        && Arrays.equals(register, other.register);
+  }
+
+  @Override
+  public int hashCode() {
+    int hashcode = 0;
+    hashcode += 31 * numZeroes;
+    hashcode += 31 * maxRegisterValue;
+    hashcode += Arrays.hashCode(register);
+    return hashcode;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
new file mode 100644
index 0000000..a90094d
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.ndv.hll;
+
+public interface HLLRegister {
+
+  /**
+   * Specify a hashcode to add to hyperloglog register.
+   * @param hashcode
+   *          - hashcode to add
+   * @return true if register value is updated else false
+   */
+  boolean add(long hashcode);
+
+  /**
+   * Instead of specifying hashcode, this interface can be used to directly
+   * specify the register index and register value. This interface is useful
+   * when reconstructing hyperloglog from a serialized representation where its
+   * not possible to regenerate the hashcode.
+   * @param idx
+   *          - register index
+   * @param value
+   *          - register value
+   * @return true if register value is updated else false
+   */
+  boolean set(int idx, byte value);
+
+  /**
+   * Merge hyperloglog registers of the same type (SPARSE or DENSE register)
+   * @param reg
+   *          - register to be merged
+   */
+  void merge(HLLRegister reg);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
new file mode 100644
index 0000000..82085dd
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.ndv.hll;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class HLLSparseRegister implements HLLRegister {
+
+  private TreeMap<Integer,Byte> sparseMap;
+
+  // for a better insertion performance values are added to temporary unsorted
+  // list which will be merged to sparse map after a threshold
+  private int[] tempList;
+  private int tempListIdx;
+
+  // number of register bits
+  private final int p;
+
+  // new number of register bits for higher accuracy
+  private final int pPrime;
+
+  // number of bits to store the number of zero runs
+  private final int qPrime;
+
+  // masks for quicker extraction of p, pPrime, qPrime values
+  private final int mask;
+  private final int pPrimeMask;
+  private final int qPrimeMask;
+
+  public HLLSparseRegister(int p, int pp, int qp) {
+    this.p = p;
+    this.sparseMap = new TreeMap<>();
+    this.tempList = new int[HLLConstants.TEMP_LIST_DEFAULT_SIZE];
+    this.tempListIdx = 0;
+    this.pPrime = pp;
+    this.qPrime = qp;
+    this.mask = ((1 << pPrime) - 1) ^ ((1 << p) - 1);
+    this.pPrimeMask = ((1 << pPrime) - 1);
+    this.qPrimeMask = (1 << qPrime) - 1;
+  }
+
+  public boolean add(long hashcode) {
+    boolean updated = false;
+
+    // fill the temp list before merging to sparse map
+    if (tempListIdx < tempList.length) {
+      int encodedHash = encodeHash(hashcode);
+      tempList[tempListIdx++] = encodedHash;
+      updated = true;
+    } else {
+      updated = mergeTempListToSparseMap();
+    }
+
+    return updated;
+  }
+
+  /**
+   * Adds temp list to sparse map. The key for sparse map entry is the register
+   * index determined by pPrime and value is the number of trailing zeroes.
+   * @return
+   */
+  private boolean mergeTempListToSparseMap() {
+    boolean updated = false;
+    for (int i = 0; i < tempListIdx; i++) {
+      int encodedHash = tempList[i];
+      int key = encodedHash & pPrimeMask;
+      byte value = (byte) (encodedHash >>> pPrime);
+      byte nr = 0;
+      // if MSB is set to 1 then next qPrime MSB bits contains the value of
+      // number of zeroes.
+      // if MSB is set to 0 then number of zeroes is contained within pPrime - 
p
+      // bits.
+      if (encodedHash < 0) {
+        nr = (byte) (value & qPrimeMask);
+      } else {
+        nr = (byte) (Integer.numberOfTrailingZeros(encodedHash >>> p) + 1);
+      }
+      updated = set(key, nr);
+    }
+
+    // reset temp list index
+    tempListIdx = 0;
+    return updated;
+  }
+
+  /**
+   * <pre>
+   * <b>Input:</b> 64 bit hashcode
+   * 
+   * |---------w-------------| |------------p'----------|
+   * 10101101.......1010101010 10101010101 01010101010101
+   *                                       |------p-----|
+   *                                       
+   * <b>Output:</b> 32 bit int
+   * 
+   * |b| |-q'-|  |------------p'----------|
+   *  1  010101  01010101010 10101010101010
+   *                         |------p-----|
+   *                    
+   * 
+   * The default values of p', q' and b are 25, 6, 1 (total 32 bits) 
respectively.
+   * This function will return an int encoded in the following format
+   * 
+   * p  - LSB p bits represent the register index
+   * p' - LSB p' bits are used for increased accuracy in estimation
+   * q' - q' bits after p' are left as such from the hashcode if b = 0 else
+   *      q' bits encodes the longest trailing zero runs from in (w-p) input 
bits
+   * b  - 0 if longest trailing zero run is contained within (p'-p) bits
+   *      1 if longest trailing zero run is computeed from (w-p) input bits and
+   *      its value is stored in q' bits
+   * </pre>
+   * @param hashcode
+   * @return
+   */
+  public int encodeHash(long hashcode) {
+    // x = p' - p
+    int x = (int) (hashcode & mask);
+    if (x == 0) {
+      // more bits should be considered for finding q (longest zero runs)
+      // set MSB to 1
+      int ntr = Long.numberOfTrailingZeros(hashcode >> p) + 1;
+      long newHashCode = hashcode & pPrimeMask;
+      newHashCode |= ntr << pPrime;
+      newHashCode |= 0x80000000;
+      return (int) newHashCode;
+    } else {
+      // q is contained within p' - p
+      // set MSB to 0
+      return (int) (hashcode & 0x7FFFFFFF);
+    }
+  }
+
+  public int getSize() {
+
+    // merge temp list before getting the size of sparse map
+    if (tempListIdx != 0) {
+      mergeTempListToSparseMap();
+    }
+    return sparseMap.size();
+  }
+
+  public void merge(HLLRegister hllRegister) {
+    if (hllRegister instanceof HLLSparseRegister) {
+      HLLSparseRegister hsr = (HLLSparseRegister) hllRegister;
+
+      // retain only the largest value for a register index
+      for (Map.Entry<Integer, Byte> entry : hsr.getSparseMap().entrySet()) {
+        int key = entry.getKey();
+        byte value = entry.getValue();
+        set(key, value);
+      }
+    } else {
+      throw new IllegalArgumentException("Specified register not instance of 
HLLSparseRegister");
+    }
+  }
+
+  public boolean set(int key, byte value) {
+    boolean updated = false;
+
+    // retain only the largest value for a register index
+    if (sparseMap.containsKey(key)) {
+      byte containedVal = sparseMap.get(key);
+      if (value > containedVal) {
+        sparseMap.put(key, value);
+        updated = true;
+      }
+    } else {
+      sparseMap.put(key, value);
+      updated = true;
+    }
+    return updated;
+  }
+
+  public TreeMap<Integer,Byte> getSparseMap() {
+    return sparseMap;
+  }
+
+  public TreeMap<Integer,Byte> getMergedSparseMap() {
+    if (tempListIdx != 0) {
+      mergeTempListToSparseMap();
+    }
+    return sparseMap;
+  }
+
+  public int getP() {
+    return p;
+  }
+
+  public int getPPrime() {
+    return pPrime;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("HLLSparseRegister - ");
+    sb.append("p: ");
+    sb.append(p);
+    sb.append(" pPrime: ");
+    sb.append(pPrime);
+    sb.append(" qPrime: ");
+    sb.append(qPrime);
+    return sb.toString();
+  }
+
+  public String toExtendedString() {
+    return toString() + " register: " + sparseMap.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof HLLSparseRegister)) {
+      return false;
+    }
+    HLLSparseRegister other = (HLLSparseRegister) obj;
+    boolean result = p == other.p && pPrime == other.pPrime && qPrime == 
other.qPrime
+        && tempListIdx == other.tempListIdx;
+    if (result) {
+      for (int i = 0; i < tempListIdx; i++) {
+        if (tempList[i] != other.tempList[i]) {
+          return false;
+        }
+      }
+
+      result = result && sparseMap.equals(other.sparseMap);
+    }
+    return result;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashcode = 0;
+    hashcode += 31 * p;
+    hashcode += 31 * pPrime;
+    hashcode += 31 * qPrime;
+    for (int i = 0; i < tempListIdx; i++) {
+      hashcode += 31 * tempList[tempListIdx];
+    }
+    hashcode += sparseMap.hashCode();
+    return hashcode;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
new file mode 100644
index 0000000..8bdb47b
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.ndv.hll;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hive.common.util.Murmur3;
+
+/**
+ * <pre>
+ * This is an implementation of the following variants of hyperloglog (HLL)
+ * algorithm 
+ * Original  - Original HLL algorithm from Flajolet et. al from
+ *             http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * HLLNoBias - Google's implementation of bias correction based on lookup table
+ *             
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
+ * HLL++     - Google's implementation of HLL++ algorithm that uses SPARSE 
registers
+ *             
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
+ * 
+ * Following are the constructor parameters that determines which algorithm is
+ * used
+ * <b>numRegisterIndexBits</b> - number of LSB hashcode bits to be used as 
register index.
+ *                        <i>Default is 14</i>. min = 4 and max = 16
+ * <b>numHashBits</b> - number of bits for hashcode. <i>Default is 64</i>. min 
= 32 and max = 128
+ * <b>encoding</b> - Type of encoding to use (SPARSE or DENSE). The algorithm 
automatically
+ *            switches to DENSE beyond a threshold. <i>Default: SPARSE</i>
+ * <b>enableBitPacking</b> - To enable bit packing or not. Bit packing 
improves compression
+ *                    at the cost of more CPU cycles. <i>Default: true</i>
+ * <b>noBias</b> - Use Google's bias table lookup for short range bias 
correction.
+ *          Enabling this will highly improve the estimation accuracy for short
+ *          range values. <i>Default: true</i>
+ *
+ * </pre>
+ */
+public class HyperLogLog implements NumDistinctValueEstimator {
+  private final static int DEFAULT_HASH_BITS = 64;
+  private final static long HASH64_ZERO = Murmur3.hash64(new byte[] {0});
+  private final static long HASH64_ONE = Murmur3.hash64(new byte[] {1});
+  private final static ByteBuffer SHORT_BUFFER = 
ByteBuffer.allocate(Short.BYTES);
+  private final static ByteBuffer INT_BUFFER = 
ByteBuffer.allocate(Integer.BYTES);
+  private final static ByteBuffer LONG_BUFFER = 
ByteBuffer.allocate(Long.BYTES);
+
+  public enum EncodingType {
+    SPARSE, DENSE
+  }
+
+  // number of bits to address registers
+  private final int p;
+
+  // number of registers - 2^p
+  private final int m;
+
+  // refer paper
+  private float alphaMM;
+
+  // enable/disable bias correction using table lookup
+  private final boolean noBias;
+
+  // enable/disable bitpacking
+  private final boolean bitPacking;
+
+  // Not making it configurable for perf reasons (avoid checks)
+  private final int chosenHashBits = DEFAULT_HASH_BITS;
+
+  private HLLDenseRegister denseRegister;
+  private HLLSparseRegister sparseRegister;
+
+  // counts are cached to avoid repeated complex computation. If register value
+  // is updated the count will be computed again.
+  private long cachedCount;
+  private boolean invalidateCount;
+
+  private EncodingType encoding;
+
+  // threshold to switch from SPARSE to DENSE encoding
+  private int encodingSwitchThreshold;
+
+  private HyperLogLog(HyperLogLogBuilder hllBuilder) {
+    if (hllBuilder.numRegisterIndexBits < HLLConstants.MIN_P_VALUE
+        || hllBuilder.numRegisterIndexBits > HLLConstants.MAX_P_VALUE) {
+      throw new IllegalArgumentException("p value should be between " + 
HLLConstants.MIN_P_VALUE
+          + " to " + HLLConstants.MAX_P_VALUE);
+    }
+    this.p = hllBuilder.numRegisterIndexBits;
+    this.m = 1 << p;
+    this.noBias = hllBuilder.noBias;
+    this.bitPacking = hllBuilder.bitPacking;
+
+    // the threshold should be less than 12K bytes for p = 14.
+    // The reason to divide by 5 is, in sparse mode after serialization the
+    // entriesin sparse map are compressed, and delta encoded as varints. The
+    // worst case size of varints are 5 bytes. Hence, 12K/5 ~= 2400 entries in
+    // sparse map.
+    if (bitPacking) {
+      this.encodingSwitchThreshold = ((m * 6) / 8) / 5;
+    } else {
+      // if bitpacking is disabled, all register values takes 8 bits and hence
+      // we can be more flexible with the threshold. For p=14, 16K/5 = 3200
+      // entries in sparse map can be allowed.
+      this.encodingSwitchThreshold = m / 3;
+    }
+
+    // initializeAlpha(DEFAULT_HASH_BITS);
+    // alphaMM value for 128 bits hash seems to perform better for default 64 
hash bits
+    this.alphaMM = 0.7213f / (1 + 1.079f / m);
+    // For efficiency alpha is multiplied by m^2
+    this.alphaMM = this.alphaMM * m * m;
+
+    this.cachedCount = -1;
+    this.invalidateCount = false;
+    this.encoding = hllBuilder.encoding;
+    if (encoding.equals(EncodingType.SPARSE)) {
+      this.sparseRegister = new HLLSparseRegister(p, 
HLLConstants.P_PRIME_VALUE,
+          HLLConstants.Q_PRIME_VALUE);
+      this.denseRegister = null;
+    } else {
+      this.sparseRegister = null;
+      this.denseRegister = new HLLDenseRegister(p, bitPacking);
+    }
+  }
+
+  public static HyperLogLogBuilder builder() {
+    return new HyperLogLogBuilder();
+  }
+
+  public static class HyperLogLogBuilder {
+    private int numRegisterIndexBits = 14;
+    private EncodingType encoding = EncodingType.SPARSE;
+    private boolean bitPacking = true;
+    private boolean noBias = true;
+
+    public HyperLogLogBuilder() {
+    }
+
+    public HyperLogLogBuilder setNumRegisterIndexBits(int b) {
+      this.numRegisterIndexBits = b;
+      return this;
+    }
+
+    public HyperLogLogBuilder setEncoding(EncodingType enc) {
+      this.encoding = enc;
+      return this;
+    }
+
+    public HyperLogLogBuilder enableBitPacking(boolean b) {
+      this.bitPacking = b;
+      return this;
+    }
+
+    public HyperLogLogBuilder enableNoBias(boolean nb) {
+      this.noBias = nb;
+      return this;
+    }
+
+    public HyperLogLog build() {
+      return new HyperLogLog(this);
+    }
+  }
+
+  // see paper for alpha initialization.
+  private void initializeAlpha(final int hashBits) {
+    if (hashBits <= 16) {
+      alphaMM = 0.673f;
+    } else if (hashBits <= 32) {
+      alphaMM = 0.697f;
+    } else if (hashBits <= 64) {
+      alphaMM = 0.709f;
+    } else {
+      alphaMM = 0.7213f / (float) (1 + 1.079f / m);
+    }
+
+    // For efficiency alpha is multiplied by m^2
+    alphaMM = alphaMM * m * m;
+  }
+
+  public void addBoolean(boolean val) {
+    add(val ? HASH64_ONE : HASH64_ZERO);
+  }
+
+  public void addByte(byte val) {
+    add(Murmur3.hash64(new byte[] {val}));
+  }
+
+  public void addBytes(byte[] val) {
+    add(Murmur3.hash64(val));
+  }
+
+  public void addShort(short val) {
+    SHORT_BUFFER.putShort(0, val);
+    add(Murmur3.hash64(SHORT_BUFFER.array()));
+  }
+
+  public void addInt(int val) {
+    INT_BUFFER.putInt(0, val);
+    add(Murmur3.hash64(INT_BUFFER.array()));
+  }
+
+  public void addLong(long val) {
+    LONG_BUFFER.putLong(0, val);
+    add(Murmur3.hash64(LONG_BUFFER.array()));
+  }
+
+  public void addFloat(float val) {
+    INT_BUFFER.putFloat(0, val);
+    add(Murmur3.hash64(INT_BUFFER.array()));
+  }
+
+  public void addDouble(double val) {
+    LONG_BUFFER.putDouble(0, val);
+    add(Murmur3.hash64(LONG_BUFFER.array()));
+  }
+
+  public void addChar(char val) {
+    SHORT_BUFFER.putChar(0, val);
+    add(Murmur3.hash64(SHORT_BUFFER.array()));
+  }
+
+  /**
+   * Java's default charset will be used for strings.
+   * @param val
+   *          - input string
+   */
+  public void addString(String val) {
+    add(Murmur3.hash64(val.getBytes()));
+  }
+
+  public void addString(String val, Charset charset) {
+    add(Murmur3.hash64(val.getBytes(charset)));
+  }
+
+  public void add(long hashcode) {
+    if (encoding.equals(EncodingType.SPARSE)) {
+      if (sparseRegister.add(hashcode)) {
+        invalidateCount = true;
+      }
+
+      // if size of sparse map excess the threshold convert the sparse map to
+      // dense register and switch to DENSE encoding
+      if (sparseRegister.getSize() > encodingSwitchThreshold) {
+        encoding = EncodingType.DENSE;
+        denseRegister = sparseToDenseRegister(sparseRegister);
+        sparseRegister = null;
+        invalidateCount = true;
+      }
+    } else {
+      if (denseRegister.add(hashcode)) {
+        invalidateCount = true;
+      }
+    }
+  }
+
+  public long estimateNumDistinctValues() {
+    // FMSketch treats the ndv of all nulls as 1 but hll treates the ndv as 0.
+    // In order to get rid of divide by 0 problem, we follow FMSketch
+    return count() > 0 ? count() : 1;
+  }
+
+  public long count() {
+    // compute count only if the register values are updated else return the
+    // cached count
+    if (invalidateCount || cachedCount < 0) {
+      if (encoding.equals(EncodingType.SPARSE)) {
+
+        // if encoding is still SPARSE use linear counting with increase
+        // accuracy (as we use pPrime bits for register index)
+        int mPrime = 1 << sparseRegister.getPPrime();
+        cachedCount = linearCount(mPrime, mPrime - sparseRegister.getSize());
+      } else {
+
+        // for DENSE encoding, use bias table lookup for HLLNoBias algorithm
+        // else fallback to HLLOriginal algorithm
+        double sum = denseRegister.getSumInversePow2();
+        long numZeros = denseRegister.getNumZeroes();
+
+        // cardinality estimate from normalized bias corrected harmonic mean on
+        // the registers
+        cachedCount = (long) (alphaMM * (1.0 / sum));
+        long pow = (long) Math.pow(2, chosenHashBits);
+
+        // when bias correction is enabled
+        if (noBias) {
+          cachedCount = cachedCount <= 5 * m ? (cachedCount - 
estimateBias(cachedCount))
+              : cachedCount;
+          long h = cachedCount;
+          if (numZeros != 0) {
+            h = linearCount(m, numZeros);
+          }
+
+          if (h < getThreshold()) {
+            cachedCount = h;
+          }
+        } else {
+          // HLL algorithm shows stronger bias for values in (2.5 * m) range.
+          // To compensate for this short range bias, linear counting is used
+          // for values before this short range. The original paper also says
+          // similar bias is seen for long range values due to hash collisions
+          // in range >1/30*(2^32). For the default case, we do not have to
+          // worry about this long range bias as the paper used 32-bit hashing
+          // and we use 64-bit hashing as default. 2^64 values are too high to
+          // observe long range bias (hash collisions).
+          if (cachedCount <= 2.5 * m) {
+
+            // for short range use linear counting
+            if (numZeros != 0) {
+              cachedCount = linearCount(m, numZeros);
+            }
+          } else if (chosenHashBits < 64 && cachedCount > (0.033333 * pow)) {
+
+            // long range bias for 32-bit hashcodes
+            if (cachedCount > (1 / 30) * pow) {
+              cachedCount = (long) (-pow * Math.log(1.0 - (double) cachedCount 
/ (double) pow));
+            }
+          }
+        }
+      }
+      invalidateCount = false;
+    }
+
+    return cachedCount;
+  }
+
+  private long getThreshold() {
+    return (long) (HLLConstants.thresholdData[p - 4] + 0.5);
+  }
+
+  /**
+   * Estimate bias from lookup table
+   * @param count
+   *          - cardinality before bias correction
+   * @return cardinality after bias correction
+   */
+  private long estimateBias(long count) {
+    double[] rawEstForP = HLLConstants.rawEstimateData[p - 4];
+
+    // compute distance and store it in sorted map
+    TreeMap<Double,Integer> estIndexMap = new TreeMap<>();
+    double distance = 0;
+    for (int i = 0; i < rawEstForP.length; i++) {
+      distance = Math.pow(count - rawEstForP[i], 2);
+      estIndexMap.put(distance, i);
+    }
+
+    // take top-k closest neighbors and compute the bias corrected cardinality
+    long result = 0;
+    double[] biasForP = HLLConstants.biasData[p - 4];
+    double biasSum = 0;
+    int kNeighbors = HLLConstants.K_NEAREST_NEIGHBOR;
+    for (Map.Entry<Double, Integer> entry : estIndexMap.entrySet()) {
+      biasSum += biasForP[entry.getValue()];
+      kNeighbors--;
+      if (kNeighbors <= 0) {
+        break;
+      }
+    }
+
+    // 0.5 added for rounding off
+    result = (long) ((biasSum / HLLConstants.K_NEAREST_NEIGHBOR) + 0.5);
+    return result;
+  }
+
+  public void setCount(long count) {
+    this.cachedCount = count;
+    this.invalidateCount = true;
+  }
+
+  private long linearCount(int mVal, long numZeros) {
+    return (long) (Math.round(mVal * Math.log(mVal / ((double) numZeros))));
+  }
+
+  // refer paper
+  public double getStandardError() {
+    return 1.04 / Math.sqrt(m);
+  }
+
+  public HLLDenseRegister getHLLDenseRegister() {
+    return denseRegister;
+  }
+
+  public HLLSparseRegister getHLLSparseRegister() {
+    return sparseRegister;
+  }
+
+  /**
+   * Reconstruct sparse map from serialized integer list
+   * @param reg
+   *          - uncompressed and delta decoded integer list
+   */
+  public void setHLLSparseRegister(int[] reg) {
+    for (int i : reg) {
+      int key = i >>> HLLConstants.Q_PRIME_VALUE;
+      byte value = (byte) (i & 0x3f);
+      sparseRegister.set(key, value);
+    }
+  }
+
+  /**
+   * Reconstruct dense registers from byte array
+   * @param reg
+   *          - unpacked byte array
+   */
+  public void setHLLDenseRegister(byte[] reg) {
+    int i = 0;
+    for (byte b : reg) {
+      denseRegister.set(i, b);
+      i++;
+    }
+  }
+
+  /**
+   * Merge the specified hyperloglog to the current one. Encoding switches
+   * automatically after merge if the encoding switch threshold is exceeded.
+   * @param hll
+   *          - hyperloglog to be merged
+   * @throws IllegalArgumentException
+   */
+  public void merge(HyperLogLog hll) {
+    if (p != hll.p || chosenHashBits != hll.chosenHashBits) {
+      throw new IllegalArgumentException(
+          "HyperLogLog cannot be merged as either p or hashbits are different. 
Current: "
+              + toString() + " Provided: " + hll.toString());
+    }
+
+    EncodingType otherEncoding = hll.getEncoding();
+
+    if (encoding.equals(EncodingType.SPARSE) && 
otherEncoding.equals(EncodingType.SPARSE)) {
+      sparseRegister.merge(hll.getHLLSparseRegister());
+      // if after merge the sparse switching threshold is exceeded then change
+      // to dense encoding
+      if (sparseRegister.getSize() > encodingSwitchThreshold) {
+        encoding = EncodingType.DENSE;
+        denseRegister = sparseToDenseRegister(sparseRegister);
+        sparseRegister = null;
+      }
+    } else if (encoding.equals(EncodingType.DENSE) && 
otherEncoding.equals(EncodingType.DENSE)) {
+      denseRegister.merge(hll.getHLLDenseRegister());
+    } else if (encoding.equals(EncodingType.SPARSE) && 
otherEncoding.equals(EncodingType.DENSE)) {
+      denseRegister = sparseToDenseRegister(sparseRegister);
+      denseRegister.merge(hll.getHLLDenseRegister());
+      sparseRegister = null;
+      encoding = EncodingType.DENSE;
+    } else if (encoding.equals(EncodingType.DENSE) && 
otherEncoding.equals(EncodingType.SPARSE)) {
+      HLLDenseRegister otherDenseRegister = 
sparseToDenseRegister(hll.getHLLSparseRegister());
+      denseRegister.merge(otherDenseRegister);
+    }
+
+    invalidateCount = true;
+  }
+
+  /**
+   * Converts sparse to dense hll register
+   * @param sparseRegister
+   *          - sparse register to be converted
+   * @return converted dense register
+   */
+  private HLLDenseRegister sparseToDenseRegister(HLLSparseRegister 
sparseRegister) {
+    if (sparseRegister == null) {
+      return null;
+    }
+    int p = sparseRegister.getP();
+    int pMask = (1 << p) - 1;
+    HLLDenseRegister result = new HLLDenseRegister(p, bitPacking);
+    for (Map.Entry<Integer, Byte> entry : 
sparseRegister.getSparseMap().entrySet()) {
+      int key = entry.getKey();
+      int idx = key & pMask;
+      result.set(idx, entry.getValue());
+    }
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Encoding: ");
+    sb.append(encoding);
+    sb.append(", p: ");
+    sb.append(p);
+    sb.append(", estimatedCardinality: ");
+    sb.append(estimateNumDistinctValues());
+    return sb.toString();
+  }
+
+  public String toStringExtended() {
+    if (encoding.equals(EncodingType.DENSE)) {
+      return toString() + ", " + denseRegister.toExtendedString();
+    } else if (encoding.equals(EncodingType.SPARSE)) {
+      return toString() + ", " + sparseRegister.toExtendedString();
+    }
+
+    return toString();
+  }
+
+  public int getNumRegisterIndexBits() {
+    return p;
+  }
+
+  public EncodingType getEncoding() {
+    return encoding;
+  }
+
+  public void setEncoding(EncodingType encoding) {
+    this.encoding = encoding;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof HyperLogLog)) {
+      return false;
+    }
+
+    HyperLogLog other = (HyperLogLog) obj;
+    long count = estimateNumDistinctValues();
+    long otherCount = other.estimateNumDistinctValues();
+    boolean result = p == other.p && chosenHashBits == other.chosenHashBits
+        && encoding.equals(other.encoding) && count == otherCount;
+    if (encoding.equals(EncodingType.DENSE)) {
+      result = result && denseRegister.equals(other.getHLLDenseRegister());
+    }
+
+    if (encoding.equals(EncodingType.SPARSE)) {
+      result = result && sparseRegister.equals(other.getHLLSparseRegister());
+    }
+    return result;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashcode = 0;
+    hashcode += 31 * p;
+    hashcode += 31 * chosenHashBits;
+    hashcode += encoding.hashCode();
+    hashcode += 31 * estimateNumDistinctValues();
+    if (encoding.equals(EncodingType.DENSE)) {
+      hashcode += 31 * denseRegister.hashCode();
+    }
+
+    if (encoding.equals(EncodingType.SPARSE)) {
+      hashcode += 31 * sparseRegister.hashCode();
+    }
+    return hashcode;
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  @Override
+  public byte[] serialize() {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    // write bytes to bos ...
+    try {
+      HyperLogLogUtils.serializeHLL(bos, this);
+      byte[] result = bos.toByteArray();
+      bos.close();
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public NumDistinctValueEstimator deserialize(byte[] buf) {
+    InputStream is = new ByteArrayInputStream(buf);
+    try {
+      HyperLogLog result = HyperLogLogUtils.deserializeHLL(is);
+      is.close();
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void addToEstimator(long v) {
+    addLong(v);
+  }
+
+  @Override
+  public void addToEstimator(String s) {
+    addString(s);
+  }
+
+  @Override
+  public void addToEstimator(double d) {
+    addDouble(d);
+  }
+
+  @Override
+  public void addToEstimator(HiveDecimal decimal) {
+    addDouble(decimal.doubleValue());
+  }
+
+  @Override
+  public void mergeEstimators(NumDistinctValueEstimator o) {
+    merge((HyperLogLog) o);
+  }
+
+  @Override
+  public int lengthFor(JavaDataModel model) {
+    // 5 is the head, 1<<p means the number of bytes for register
+    return (5 + (1 << p));
+  }
+
+  @Override
+  public boolean canMerge(NumDistinctValueEstimator o) {
+    return o instanceof HyperLogLog;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
new file mode 100644
index 0000000..4e6510b
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.ndv.hll;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType;
+
+/**
+ * HyperLogLog serialization utilities.
+ */
+public class HyperLogLogUtils {
+
+  public static final byte[] MAGIC = new byte[] { 'H', 'L', 'L' };
+
+  /**
+   * HyperLogLog is serialized using the following format
+   * 
+   * <pre>
+   * |-4 byte-|------varlong----|varint (optional)|----------|  
+   * ---------------------------------------------------------
+   * | header | estimated-count | register-length | register |
+   * ---------------------------------------------------------
+   * 
+   * <b>4 byte header</b> is encoded like below
+   * 3 bytes - HLL magic string to identify serialized stream
+   * 4 bits  - p (number of bits to be used as register index)
+   * 1       - spare bit (not used)
+   * 3 bits  - encoding (000 - sparse, 001..110 - n bit packing, 111 - no bit 
packing)
+   * 
+   * Followed by header are 3 fields that are required for reconstruction
+   * of hyperloglog
+   * Estimated count - variable length long to store last computed estimated 
count.
+   *                   This is just for quick lookup without deserializing 
registers
+   * Register length - number of entries in the register (required only for 
+   *                   for sparse representation. For bit-packing, the register
+   *                   length can be found from p)
+   * </pre>
+   * @param out
+   *          - output stream to write to
+   * @param hll
+   *          - hyperloglog that needs to be serialized
+   * @throws IOException
+   */
+  public static void serializeHLL(OutputStream out, HyperLogLog hll) throws 
IOException {
+
+    // write header
+    out.write(MAGIC);
+    int fourthByte = 0;
+    int p = hll.getNumRegisterIndexBits();
+    fourthByte = (p & 0xff) << 4;
+
+    int bitWidth = 0;
+    EncodingType enc = hll.getEncoding();
+
+    // determine bit width for bitpacking and encode it in header
+    if (enc.equals(EncodingType.DENSE)) {
+      int lzr = hll.getHLLDenseRegister().getMaxRegisterValue();
+      bitWidth = getBitWidth(lzr);
+
+      // the max value of number of zeroes for 64 bit hash can be encoded using
+      // only 6 bits. So we will disable bit packing for any values >6
+      if (bitWidth > 6) {
+        fourthByte |= 7;
+        bitWidth = 8;
+      } else {
+        fourthByte |= (bitWidth & 7);
+      }
+    }
+
+    // write fourth byte of header
+    out.write(fourthByte);
+
+    // write estimated count
+    long estCount = hll.estimateNumDistinctValues();
+    writeVulong(out, estCount);
+
+    // serialize dense/sparse registers. Dense registers are bitpacked whereas
+    // sparse registers are delta and variable length encoded
+    if (enc.equals(EncodingType.DENSE)) {
+      byte[] register = hll.getHLLDenseRegister().getRegister();
+      bitpackHLLRegister(out, register, bitWidth);
+    } else if (enc.equals(EncodingType.SPARSE)) {
+      TreeMap<Integer, Byte> sparseMap = 
hll.getHLLSparseRegister().getSparseMap();
+
+      // write the number of elements in sparse map (required for
+      // reconstruction)
+      writeVulong(out, sparseMap.size());
+
+      // compute deltas and write the values as varints
+      int prev = 0;
+      for (Map.Entry<Integer, Byte> entry : sparseMap.entrySet()) {
+        if (prev == 0) {
+          prev = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | 
entry.getValue();
+          writeVulong(out, prev);
+        } else {
+          int curr = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | 
entry.getValue();
+          int delta = curr - prev;
+          writeVulong(out, delta);
+          prev = curr;
+        }
+      }
+    }
+  }
+
+  /**
+   * Refer serializeHLL() for format of serialization. This funtions
+   * deserializes the serialized hyperloglogs
+   * @param in
+   *          - input stream
+   * @return deserialized hyperloglog
+   * @throws IOException
+   */
+  public static HyperLogLog deserializeHLL(InputStream in) throws IOException {
+    checkMagicString(in);
+    int fourthByte = in.read() & 0xff;
+    int p = fourthByte >>> 4;
+
+    // read type of encoding
+    int enc = fourthByte & 7;
+    EncodingType encoding = null;
+    int bitSize = 0;
+    if (enc == 0) {
+      encoding = EncodingType.SPARSE;
+    } else if (enc > 0 && enc < 7) {
+      bitSize = enc;
+      encoding = EncodingType.DENSE;
+    } else {
+      // bit packing disabled
+      bitSize = 8;
+      encoding = EncodingType.DENSE;
+    }
+
+    // estimated count
+    long estCount = readVulong(in);
+
+    HyperLogLog result = null;
+    if (encoding.equals(EncodingType.SPARSE)) {
+      result = HyperLogLog.builder().setNumRegisterIndexBits(p)
+          .setEncoding(EncodingType.SPARSE).build();
+      int numRegisterEntries = (int) readVulong(in);
+      int[] reg = new int[numRegisterEntries];
+      int prev = 0;
+
+      // reconstruct the sparse map from delta encoded and varint input stream
+      if (numRegisterEntries > 0) {
+        prev = (int) readVulong(in);
+        reg[0] = prev;
+      }
+      int delta = 0;
+      int curr = 0;
+      for (int i = 1; i < numRegisterEntries; i++) {
+        delta = (int) readVulong(in);
+        curr = prev + delta;
+        reg[i] = curr;
+        prev = curr;
+      }
+      result.setHLLSparseRegister(reg);
+    } else {
+
+      // explicitly disable bit packing
+      if (bitSize == 8) {
+        result = HyperLogLog.builder().setNumRegisterIndexBits(p)
+            .setEncoding(EncodingType.DENSE).enableBitPacking(false).build();
+      } else {
+        result = HyperLogLog.builder().setNumRegisterIndexBits(p)
+            .setEncoding(EncodingType.DENSE).enableBitPacking(true).build();
+      }
+      int m = 1 << p;
+      byte[] register = unpackHLLRegister(in, m, bitSize);
+      result.setHLLDenseRegister(register);
+    }
+
+    result.setCount(estCount);
+
+    return result;
+  }
+
+  private static void bitpackHLLRegister(OutputStream out, byte[] register, 
int bitWidth)
+      throws IOException {
+    int bitsLeft = 8;
+    byte current = 0;
+
+    if (bitWidth == 8) {
+      fastPathWrite(out, register);
+      return;
+    }
+
+    // write the blob
+    for (byte value : register) {
+      int bitsToWrite = bitWidth;
+      while (bitsToWrite > bitsLeft) {
+        // add the bits to the bottom of the current word
+        current |= value >>> (bitsToWrite - bitsLeft);
+        // subtract out the bits we just added
+        bitsToWrite -= bitsLeft;
+        // zero out the bits above bitsToWrite
+        value &= (1 << bitsToWrite) - 1;
+        out.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+      bitsLeft -= bitsToWrite;
+      current |= value << bitsLeft;
+      if (bitsLeft == 0) {
+        out.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+    }
+
+    out.flush();
+  }
+
+  private static void fastPathWrite(OutputStream out, byte[] register) throws 
IOException {
+    for (byte b : register) {
+      out.write(b);
+    }
+  }
+
+  /**
+   * Unpack the bitpacked HyperLogLog register.
+   * @param in
+   *          - input stream
+   * @param length
+   *          - serialized length
+   * @return unpacked HLL register
+   * @throws IOException
+   */
+  private static byte[] unpackHLLRegister(InputStream in, int length, int 
bitSize)
+      throws IOException {
+    int mask = (1 << bitSize) - 1;
+    int bitsLeft = 8;
+
+    if (bitSize == 8) {
+      return fastPathRead(in, length);
+    }
+
+    byte current = (byte) (0xff & in.read());
+
+    byte[] output = new byte[length];
+    for (int i = 0; i < output.length; i++) {
+      byte result = 0;
+      int bitsLeftToRead = bitSize;
+      while (bitsLeftToRead > bitsLeft) {
+        result <<= bitsLeft;
+        result |= current & ((1 << bitsLeft) - 1);
+        bitsLeftToRead -= bitsLeft;
+        current = (byte) (0xff & in.read());
+        bitsLeft = 8;
+      }
+      if (bitsLeftToRead > 0) {
+        result <<= bitsLeftToRead;
+        bitsLeft -= bitsLeftToRead;
+        result |= (current >>> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+      }
+      output[i] = (byte) (result & mask);
+    }
+    return output;
+  }
+
+  private static byte[] fastPathRead(InputStream in, int length) throws 
IOException {
+    byte[] result = new byte[length];
+    for (int i = 0; i < length; i++) {
+      result[i] = (byte) in.read();
+    }
+    return result;
+  }
+
+  /**
+   * Get estimated cardinality without deserializing HLL
+   * @param in
+   *          - serialized HLL
+   * @return - cardinality
+   * @throws IOException
+   */
+  public static long getEstimatedCountFromSerializedHLL(InputStream in) throws 
IOException {
+    checkMagicString(in);
+    in.read();
+    return readVulong(in);
+  }
+
+  /**
+   * Check if the specified input stream is actually a HLL stream
+   * @param in
+   *          - input stream
+   * @throws IOException
+   */
+  private static void checkMagicString(InputStream in) throws IOException {
+    byte[] magic = new byte[3];
+    magic[0] = (byte) in.read();
+    magic[1] = (byte) in.read();
+    magic[2] = (byte) in.read();
+
+    if (!Arrays.equals(magic, MAGIC)) {
+      throw new IllegalArgumentException("The input stream is not a 
HyperLogLog stream.");
+    }
+  }
+
+  /**
+   * Minimum bits required to encode the specified value
+   * @param val
+   *          - input value
+   * @return
+   */
+  private static int getBitWidth(int val) {
+    int count = 0;
+    while (val != 0) {
+      count++;
+      val = (byte) (val >>> 1);
+    }
+    return count;
+  }
+
+  /**
+   * Return relative error between actual and estimated cardinality
+   * @param actualCount
+   *          - actual count
+   * @param estimatedCount
+   *          - estimated count
+   * @return relative error
+   */
+  public static float getRelativeError(long actualCount, long estimatedCount) {
+    float err = (1.0f - ((float) estimatedCount / (float) actualCount)) * 
100.0f;
+    return err;
+  }
+
+  /**
+   * Write variable length encoded longs to output stream
+   * @param output
+   *          - out stream
+   * @param value
+   *          - long
+   * @throws IOException
+   */
+  private static void writeVulong(OutputStream output, long value) throws 
IOException {
+    while (true) {
+      if ((value & ~0x7f) == 0) {
+        output.write((byte) value);
+        return;
+      } else {
+        output.write((byte) (0x80 | (value & 0x7f)));
+        value >>>= 7;
+      }
+    }
+  }
+
+  /**
+   * Read variable length encoded longs from input stream
+   * @param in
+   *          - input stream
+   * @return decoded long value
+   * @throws IOException
+   */
+  private static long readVulong(InputStream in) throws IOException {
+    long result = 0;
+    long b;
+    int offset = 0;
+    do {
+      b = in.read();
+      if (b == -1) {
+        throw new EOFException("Reading Vulong past EOF");
+      }
+      result |= (0x7f & b) << offset;
+      offset += 7;
+    } while (b >= 0x80);
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java
new file mode 100644
index 0000000..2e00005
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * This class is used to monitor long running methods in a thread.
+ * It is recommended to use it as a ThreadLocal variable.
+ */
+public class Deadline {
+  private static final Logger LOG = 
LoggerFactory.getLogger(Deadline.class.getName());
+
+  /**
+   * its value is init from conf, and could be reset from client.
+   */
+  private long timeoutNanos;
+
+  /**
+   * it is reset before executing a method
+   */
+  private long startTime = NO_DEADLINE;
+
+  /**
+   * The name of public methods in HMSHandler
+   */
+  private String method;
+
+  private Deadline(long timeoutMs) {
+    this.timeoutNanos = timeoutMs * 1000000L;
+  }
+
+  /**
+   * Deadline object per thread.
+   */
+  private static final ThreadLocal<Deadline> DEADLINE_THREAD_LOCAL = new 
ThreadLocal<Deadline>() {
+        @Override
+        protected Deadline initialValue() {
+          return null;
+        }
+      };
+
+  private static void setCurrentDeadline(Deadline deadline) {
+    DEADLINE_THREAD_LOCAL.set(deadline);
+  }
+
+  static Deadline getCurrentDeadline() {
+    return DEADLINE_THREAD_LOCAL.get();
+  }
+
+  private static void removeCurrentDeadline() {
+    DEADLINE_THREAD_LOCAL.remove();
+  }
+
+  /**
+   * register a Deadline threadlocal object to current thread.
+   * @param timeout
+   */
+  public static void registerIfNot(long timeout) {
+    if (getCurrentDeadline() == null) {
+      setCurrentDeadline(new Deadline(timeout));
+    }
+  }
+
+  /**
+   * reset the timeout value of this timer.
+   * @param timeoutMs
+   */
+  public static void resetTimeout(long timeoutMs) throws MetaException {
+    if (timeoutMs <= 0) {
+      throw MetaStoreUtils.newMetaException(new DeadlineException("The reset 
timeout value should be " +
+          "larger than 0: " + timeoutMs));
+    }
+    Deadline deadline = getCurrentDeadline();
+    if (deadline != null) {
+      deadline.timeoutNanos = timeoutMs * 1000000L;
+    } else {
+      throw MetaStoreUtils.newMetaException(new DeadlineException("The 
threadlocal Deadline is null," +
+          " please register it first."));
+    }
+  }
+
+  /**
+   * start the timer before a method is invoked.
+   * @param method method to be invoked
+   */
+  public static boolean startTimer(String method) throws MetaException {
+    Deadline deadline = getCurrentDeadline();
+    if (deadline == null) {
+      throw MetaStoreUtils.newMetaException(new DeadlineException("The 
threadlocal Deadline is null," +
+          " please register it first."));
+    }
+    if (deadline.startTime != NO_DEADLINE) return false;
+    deadline.method = method;
+    do {
+      deadline.startTime = System.nanoTime();
+    } while (deadline.startTime == NO_DEADLINE);
+    return true;
+  }
+
+  /**
+   * end the time after a method is done.
+   */
+  public static void stopTimer() throws MetaException {
+    Deadline deadline = getCurrentDeadline();
+    if (deadline != null) {
+      deadline.startTime = NO_DEADLINE;
+      deadline.method = null;
+    } else {
+      throw MetaStoreUtils.newMetaException(new DeadlineException("The 
threadlocal Deadline is null," +
+          " please register it first."));
+    }
+  }
+
+  /**
+   * remove the registered Deadline threadlocal object from current thread.
+   */
+  public static void clear() {
+    removeCurrentDeadline();
+  }
+
+  /**
+   * Check whether the long running method timeout.
+   * @throws MetaException when the method timeout
+   */
+  public static void checkTimeout() throws MetaException {
+    Deadline deadline = getCurrentDeadline();
+    if (deadline != null) {
+      deadline.check();
+    } else {
+      throw MetaStoreUtils.newMetaException(new DeadlineException("The 
threadlocal Deadline is null," +
+          " please register it first."));
+    }
+  }
+
+  private static final long NO_DEADLINE = Long.MIN_VALUE;
+
+  private void check() throws MetaException{
+    try {
+      if (startTime == NO_DEADLINE) {
+        throw new DeadlineException("Should execute startTimer() method before 
" +
+            "checkTimeout. Error happens in method: " + method);
+      }
+      long elapsedTime = System.nanoTime() - startTime;
+      if (elapsedTime > timeoutNanos) {
+        throw new DeadlineException("Timeout when executing method: " + method 
+ "; "
+            + (elapsedTime / 1000000L) + "ms exceeds " + (timeoutNanos / 
1000000L)  + "ms");
+      }
+    } catch (DeadlineException e) {
+      throw MetaStoreUtils.newMetaException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java
new file mode 100644
index 0000000..bfff89d
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+/**
+ * Thrown when a long running method timeout is checked.
+ */
+public class DeadlineException extends Exception {
+
+  public DeadlineException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
new file mode 100644
index 0000000..4c14ab0
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+
+/**
+ * The base implementation of a file metadata handler for a specific file type.
+ * There are currently two classes for each file type (of 1), this one, which 
is very simple due
+ * to the fact that it just calls the proxy class for most calls; and the 
proxy class, that
+ * contains the actual implementation that depends on some stuff in QL (for 
ORC).
+ */
+public abstract class FileMetadataHandler {
+  protected static final Log LOG = 
LogFactory.getLog(FileMetadataHandler.class);
+
+  private Configuration conf;
+  private PartitionExpressionProxy expressionProxy;
+  private FileFormatProxy fileFormatProxy;
+  private MetadataStore store;
+
+  /**
+   * Same as RawStore.getFileMetadataByExpr.
+   */
+  public abstract void getFileMetadataByExpr(List<Long> fileIds, byte[] expr,
+      ByteBuffer[] metadatas, ByteBuffer[] results, boolean[] eliminated) 
throws IOException;
+
+  protected abstract FileMetadataExprType getType();
+
+  protected PartitionExpressionProxy getExpressionProxy() {
+    return expressionProxy;
+  }
+
+  protected FileFormatProxy getFileFormatProxy() {
+    return fileFormatProxy;
+  }
+
+  protected MetadataStore getStore() {
+    return store;
+  }
+
+  /**
+   * Configures the handler. Called once before use.
+   * @param conf Config.
+   * @param expressionProxy Expression proxy to access ql stuff.
+   * @param store Storage interface to manipulate the metadata.
+   */
+  public void configure(
+      Configuration conf, PartitionExpressionProxy expressionProxy, 
MetadataStore store) {
+    this.conf = conf;
+    this.expressionProxy = expressionProxy;
+    this.store = store;
+    this.fileFormatProxy = expressionProxy.getFileFormatProxy(getType());
+  }
+
+  /**
+   * Caches the file metadata for a particular file.
+   * @param fileId File id.
+   * @param fs The filesystem of the file.
+   * @param path Path to the file.
+   */
+  public void cacheFileMetadata(long fileId, FileSystem fs, Path path)
+      throws IOException, InterruptedException {
+    // ORC is in ql, so we cannot do anything here. For now, all the logic is 
in the proxy.
+    ByteBuffer[] cols = fileFormatProxy.getAddedColumnsToCache();
+    ByteBuffer[] vals = (cols == null) ? null : new ByteBuffer[cols.length];
+    ByteBuffer metadata = fileFormatProxy.getMetadataToCache(fs, path, vals);
+    LOG.info("Caching file metadata for " + path + ", size " + 
metadata.remaining());
+    store.storeFileMetadata(fileId, metadata, cols, vals);
+  }
+
+  /**
+   * @return the added column names to be cached in metastore with the 
metadata for this type.
+   */
+  public ByteBuffer[] createAddedCols() {
+    return fileFormatProxy.getAddedColumnsToCache();
+  }
+
+  /**
+   * @return the values for the added columns returned by createAddedCols for 
respective metadatas.
+   */
+  public ByteBuffer[][] createAddedColVals(List<ByteBuffer> metadata) {
+    return fileFormatProxy.getAddedValuesToCache(metadata);
+  }
+}

Reply via email to