http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java new file mode 100644 index 0000000..c52746e --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.ndv.hll; + +import java.util.Arrays; + +public class HLLDenseRegister implements HLLRegister { + + // 2^p number of bytes for register + private byte[] register; + + // max value stored in registered is cached to determine the bit width for + // bit packing + private int maxRegisterValue; + + // keep count of number of zeroes in registers + private int numZeroes; + + // compute and cache inverse power of 2 for register values + private double[] invPow2Register; + + // number of register bits + private int p; + + // m = 2^p + private int m; + + public HLLDenseRegister(int p) { + this(p, true); + } + + public HLLDenseRegister(int p, boolean bitPack) { + this.p = p; + this.m = 1 << p; + this.register = new byte[m]; + this.invPow2Register = new double[m]; + Arrays.fill(invPow2Register, 1.0); + this.maxRegisterValue = 0; + this.numZeroes = m; + if (bitPack == false) { + this.maxRegisterValue = 0xff; + } + } + + public boolean add(long hashcode) { + + // LSB p bits + final int registerIdx = (int) (hashcode & (m - 1)); + + // MSB 64 - p bits + final long w = hashcode >>> p; + + // longest run of trailing zeroes + final int lr = Long.numberOfTrailingZeros(w) + 1; + return set(registerIdx, (byte) lr); + } + + public boolean set(int idx, byte value) { + boolean updated = false; + if (idx < register.length && value > register[idx]) { + + // update max register value + if (value > maxRegisterValue) { + maxRegisterValue = value; + } + + // update number of zeros + if (register[idx] == 0 && value > 0) { + numZeroes--; + } + + // set register value and compute inverse pow of 2 for register value + register[idx] = value; + invPow2Register[idx] = Math.pow(2, -value); + + updated = true; + } + return updated; + } + + public int size() { + return register.length; + } + + public int getNumZeroes() { + return numZeroes; + } + + public void merge(HLLRegister hllRegister) { + if (hllRegister instanceof HLLDenseRegister) { + HLLDenseRegister hdr = (HLLDenseRegister) hllRegister; + byte[] inRegister = hdr.getRegister(); + + // merge only if the register length matches + if (register.length != inRegister.length) { + throw new IllegalArgumentException( + "The size of register sets of HyperLogLogs to be merged does not match."); + } + + // compare register values and store the max register value + for (int i = 0; i < inRegister.length; i++) { + if (inRegister[i] > register[i]) { + if (register[i] == 0) { + numZeroes--; + } + register[i] = inRegister[i]; + invPow2Register[i] = Math.pow(2, -inRegister[i]); + } + } + + // update max register value + if (hdr.getMaxRegisterValue() > maxRegisterValue) { + maxRegisterValue = hdr.getMaxRegisterValue(); + } + } else { + throw new IllegalArgumentException("Specified register is not instance of HLLDenseRegister"); + } + } + + public byte[] getRegister() { + return register; + } + + public void setRegister(byte[] register) { + this.register = register; + } + + public int getMaxRegisterValue() { + return maxRegisterValue; + } + + public double getSumInversePow2() { + double sum = 0; + for (double d : invPow2Register) { + sum += d; + } + return sum; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("HLLDenseRegister - "); + sb.append("p: "); + sb.append(p); + sb.append(" numZeroes: "); + sb.append(numZeroes); + sb.append(" maxRegisterValue: "); + sb.append(maxRegisterValue); + return sb.toString(); + } + + public String toExtendedString() { + return toString() + " register: " + Arrays.toString(register); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof HLLDenseRegister)) { + return false; + } + HLLDenseRegister other = (HLLDenseRegister) obj; + return numZeroes == other.numZeroes && maxRegisterValue == other.maxRegisterValue + && Arrays.equals(register, other.register); + } + + @Override + public int hashCode() { + int hashcode = 0; + hashcode += 31 * numZeroes; + hashcode += 31 * maxRegisterValue; + hashcode += Arrays.hashCode(register); + return hashcode; + } + +}
http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java new file mode 100644 index 0000000..a90094d --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.ndv.hll; + +public interface HLLRegister { + + /** + * Specify a hashcode to add to hyperloglog register. + * @param hashcode + * - hashcode to add + * @return true if register value is updated else false + */ + boolean add(long hashcode); + + /** + * Instead of specifying hashcode, this interface can be used to directly + * specify the register index and register value. This interface is useful + * when reconstructing hyperloglog from a serialized representation where its + * not possible to regenerate the hashcode. + * @param idx + * - register index + * @param value + * - register value + * @return true if register value is updated else false + */ + boolean set(int idx, byte value); + + /** + * Merge hyperloglog registers of the same type (SPARSE or DENSE register) + * @param reg + * - register to be merged + */ + void merge(HLLRegister reg); +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java new file mode 100644 index 0000000..82085dd --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.ndv.hll; + +import java.util.Map; +import java.util.TreeMap; + +public class HLLSparseRegister implements HLLRegister { + + private TreeMap<Integer,Byte> sparseMap; + + // for a better insertion performance values are added to temporary unsorted + // list which will be merged to sparse map after a threshold + private int[] tempList; + private int tempListIdx; + + // number of register bits + private final int p; + + // new number of register bits for higher accuracy + private final int pPrime; + + // number of bits to store the number of zero runs + private final int qPrime; + + // masks for quicker extraction of p, pPrime, qPrime values + private final int mask; + private final int pPrimeMask; + private final int qPrimeMask; + + public HLLSparseRegister(int p, int pp, int qp) { + this.p = p; + this.sparseMap = new TreeMap<>(); + this.tempList = new int[HLLConstants.TEMP_LIST_DEFAULT_SIZE]; + this.tempListIdx = 0; + this.pPrime = pp; + this.qPrime = qp; + this.mask = ((1 << pPrime) - 1) ^ ((1 << p) - 1); + this.pPrimeMask = ((1 << pPrime) - 1); + this.qPrimeMask = (1 << qPrime) - 1; + } + + public boolean add(long hashcode) { + boolean updated = false; + + // fill the temp list before merging to sparse map + if (tempListIdx < tempList.length) { + int encodedHash = encodeHash(hashcode); + tempList[tempListIdx++] = encodedHash; + updated = true; + } else { + updated = mergeTempListToSparseMap(); + } + + return updated; + } + + /** + * Adds temp list to sparse map. The key for sparse map entry is the register + * index determined by pPrime and value is the number of trailing zeroes. + * @return + */ + private boolean mergeTempListToSparseMap() { + boolean updated = false; + for (int i = 0; i < tempListIdx; i++) { + int encodedHash = tempList[i]; + int key = encodedHash & pPrimeMask; + byte value = (byte) (encodedHash >>> pPrime); + byte nr = 0; + // if MSB is set to 1 then next qPrime MSB bits contains the value of + // number of zeroes. + // if MSB is set to 0 then number of zeroes is contained within pPrime - p + // bits. + if (encodedHash < 0) { + nr = (byte) (value & qPrimeMask); + } else { + nr = (byte) (Integer.numberOfTrailingZeros(encodedHash >>> p) + 1); + } + updated = set(key, nr); + } + + // reset temp list index + tempListIdx = 0; + return updated; + } + + /** + * <pre> + * <b>Input:</b> 64 bit hashcode + * + * |---------w-------------| |------------p'----------| + * 10101101.......1010101010 10101010101 01010101010101 + * |------p-----| + * + * <b>Output:</b> 32 bit int + * + * |b| |-q'-| |------------p'----------| + * 1 010101 01010101010 10101010101010 + * |------p-----| + * + * + * The default values of p', q' and b are 25, 6, 1 (total 32 bits) respectively. + * This function will return an int encoded in the following format + * + * p - LSB p bits represent the register index + * p' - LSB p' bits are used for increased accuracy in estimation + * q' - q' bits after p' are left as such from the hashcode if b = 0 else + * q' bits encodes the longest trailing zero runs from in (w-p) input bits + * b - 0 if longest trailing zero run is contained within (p'-p) bits + * 1 if longest trailing zero run is computeed from (w-p) input bits and + * its value is stored in q' bits + * </pre> + * @param hashcode + * @return + */ + public int encodeHash(long hashcode) { + // x = p' - p + int x = (int) (hashcode & mask); + if (x == 0) { + // more bits should be considered for finding q (longest zero runs) + // set MSB to 1 + int ntr = Long.numberOfTrailingZeros(hashcode >> p) + 1; + long newHashCode = hashcode & pPrimeMask; + newHashCode |= ntr << pPrime; + newHashCode |= 0x80000000; + return (int) newHashCode; + } else { + // q is contained within p' - p + // set MSB to 0 + return (int) (hashcode & 0x7FFFFFFF); + } + } + + public int getSize() { + + // merge temp list before getting the size of sparse map + if (tempListIdx != 0) { + mergeTempListToSparseMap(); + } + return sparseMap.size(); + } + + public void merge(HLLRegister hllRegister) { + if (hllRegister instanceof HLLSparseRegister) { + HLLSparseRegister hsr = (HLLSparseRegister) hllRegister; + + // retain only the largest value for a register index + for (Map.Entry<Integer, Byte> entry : hsr.getSparseMap().entrySet()) { + int key = entry.getKey(); + byte value = entry.getValue(); + set(key, value); + } + } else { + throw new IllegalArgumentException("Specified register not instance of HLLSparseRegister"); + } + } + + public boolean set(int key, byte value) { + boolean updated = false; + + // retain only the largest value for a register index + if (sparseMap.containsKey(key)) { + byte containedVal = sparseMap.get(key); + if (value > containedVal) { + sparseMap.put(key, value); + updated = true; + } + } else { + sparseMap.put(key, value); + updated = true; + } + return updated; + } + + public TreeMap<Integer,Byte> getSparseMap() { + return sparseMap; + } + + public TreeMap<Integer,Byte> getMergedSparseMap() { + if (tempListIdx != 0) { + mergeTempListToSparseMap(); + } + return sparseMap; + } + + public int getP() { + return p; + } + + public int getPPrime() { + return pPrime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("HLLSparseRegister - "); + sb.append("p: "); + sb.append(p); + sb.append(" pPrime: "); + sb.append(pPrime); + sb.append(" qPrime: "); + sb.append(qPrime); + return sb.toString(); + } + + public String toExtendedString() { + return toString() + " register: " + sparseMap.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof HLLSparseRegister)) { + return false; + } + HLLSparseRegister other = (HLLSparseRegister) obj; + boolean result = p == other.p && pPrime == other.pPrime && qPrime == other.qPrime + && tempListIdx == other.tempListIdx; + if (result) { + for (int i = 0; i < tempListIdx; i++) { + if (tempList[i] != other.tempList[i]) { + return false; + } + } + + result = result && sparseMap.equals(other.sparseMap); + } + return result; + } + + @Override + public int hashCode() { + int hashcode = 0; + hashcode += 31 * p; + hashcode += 31 * pPrime; + hashcode += 31 * qPrime; + for (int i = 0; i < tempListIdx; i++) { + hashcode += 31 * tempList[tempListIdx]; + } + hashcode += sparseMap.hashCode(); + return hashcode; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java new file mode 100644 index 0000000..8bdb47b --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.ndv.hll; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hive.common.util.Murmur3; + +/** + * <pre> + * This is an implementation of the following variants of hyperloglog (HLL) + * algorithm + * Original - Original HLL algorithm from Flajolet et. al from + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * HLLNoBias - Google's implementation of bias correction based on lookup table + * http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf + * HLL++ - Google's implementation of HLL++ algorithm that uses SPARSE registers + * http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf + * + * Following are the constructor parameters that determines which algorithm is + * used + * <b>numRegisterIndexBits</b> - number of LSB hashcode bits to be used as register index. + * <i>Default is 14</i>. min = 4 and max = 16 + * <b>numHashBits</b> - number of bits for hashcode. <i>Default is 64</i>. min = 32 and max = 128 + * <b>encoding</b> - Type of encoding to use (SPARSE or DENSE). The algorithm automatically + * switches to DENSE beyond a threshold. <i>Default: SPARSE</i> + * <b>enableBitPacking</b> - To enable bit packing or not. Bit packing improves compression + * at the cost of more CPU cycles. <i>Default: true</i> + * <b>noBias</b> - Use Google's bias table lookup for short range bias correction. + * Enabling this will highly improve the estimation accuracy for short + * range values. <i>Default: true</i> + * + * </pre> + */ +public class HyperLogLog implements NumDistinctValueEstimator { + private final static int DEFAULT_HASH_BITS = 64; + private final static long HASH64_ZERO = Murmur3.hash64(new byte[] {0}); + private final static long HASH64_ONE = Murmur3.hash64(new byte[] {1}); + private final static ByteBuffer SHORT_BUFFER = ByteBuffer.allocate(Short.BYTES); + private final static ByteBuffer INT_BUFFER = ByteBuffer.allocate(Integer.BYTES); + private final static ByteBuffer LONG_BUFFER = ByteBuffer.allocate(Long.BYTES); + + public enum EncodingType { + SPARSE, DENSE + } + + // number of bits to address registers + private final int p; + + // number of registers - 2^p + private final int m; + + // refer paper + private float alphaMM; + + // enable/disable bias correction using table lookup + private final boolean noBias; + + // enable/disable bitpacking + private final boolean bitPacking; + + // Not making it configurable for perf reasons (avoid checks) + private final int chosenHashBits = DEFAULT_HASH_BITS; + + private HLLDenseRegister denseRegister; + private HLLSparseRegister sparseRegister; + + // counts are cached to avoid repeated complex computation. If register value + // is updated the count will be computed again. + private long cachedCount; + private boolean invalidateCount; + + private EncodingType encoding; + + // threshold to switch from SPARSE to DENSE encoding + private int encodingSwitchThreshold; + + private HyperLogLog(HyperLogLogBuilder hllBuilder) { + if (hllBuilder.numRegisterIndexBits < HLLConstants.MIN_P_VALUE + || hllBuilder.numRegisterIndexBits > HLLConstants.MAX_P_VALUE) { + throw new IllegalArgumentException("p value should be between " + HLLConstants.MIN_P_VALUE + + " to " + HLLConstants.MAX_P_VALUE); + } + this.p = hllBuilder.numRegisterIndexBits; + this.m = 1 << p; + this.noBias = hllBuilder.noBias; + this.bitPacking = hllBuilder.bitPacking; + + // the threshold should be less than 12K bytes for p = 14. + // The reason to divide by 5 is, in sparse mode after serialization the + // entriesin sparse map are compressed, and delta encoded as varints. The + // worst case size of varints are 5 bytes. Hence, 12K/5 ~= 2400 entries in + // sparse map. + if (bitPacking) { + this.encodingSwitchThreshold = ((m * 6) / 8) / 5; + } else { + // if bitpacking is disabled, all register values takes 8 bits and hence + // we can be more flexible with the threshold. For p=14, 16K/5 = 3200 + // entries in sparse map can be allowed. + this.encodingSwitchThreshold = m / 3; + } + + // initializeAlpha(DEFAULT_HASH_BITS); + // alphaMM value for 128 bits hash seems to perform better for default 64 hash bits + this.alphaMM = 0.7213f / (1 + 1.079f / m); + // For efficiency alpha is multiplied by m^2 + this.alphaMM = this.alphaMM * m * m; + + this.cachedCount = -1; + this.invalidateCount = false; + this.encoding = hllBuilder.encoding; + if (encoding.equals(EncodingType.SPARSE)) { + this.sparseRegister = new HLLSparseRegister(p, HLLConstants.P_PRIME_VALUE, + HLLConstants.Q_PRIME_VALUE); + this.denseRegister = null; + } else { + this.sparseRegister = null; + this.denseRegister = new HLLDenseRegister(p, bitPacking); + } + } + + public static HyperLogLogBuilder builder() { + return new HyperLogLogBuilder(); + } + + public static class HyperLogLogBuilder { + private int numRegisterIndexBits = 14; + private EncodingType encoding = EncodingType.SPARSE; + private boolean bitPacking = true; + private boolean noBias = true; + + public HyperLogLogBuilder() { + } + + public HyperLogLogBuilder setNumRegisterIndexBits(int b) { + this.numRegisterIndexBits = b; + return this; + } + + public HyperLogLogBuilder setEncoding(EncodingType enc) { + this.encoding = enc; + return this; + } + + public HyperLogLogBuilder enableBitPacking(boolean b) { + this.bitPacking = b; + return this; + } + + public HyperLogLogBuilder enableNoBias(boolean nb) { + this.noBias = nb; + return this; + } + + public HyperLogLog build() { + return new HyperLogLog(this); + } + } + + // see paper for alpha initialization. + private void initializeAlpha(final int hashBits) { + if (hashBits <= 16) { + alphaMM = 0.673f; + } else if (hashBits <= 32) { + alphaMM = 0.697f; + } else if (hashBits <= 64) { + alphaMM = 0.709f; + } else { + alphaMM = 0.7213f / (float) (1 + 1.079f / m); + } + + // For efficiency alpha is multiplied by m^2 + alphaMM = alphaMM * m * m; + } + + public void addBoolean(boolean val) { + add(val ? HASH64_ONE : HASH64_ZERO); + } + + public void addByte(byte val) { + add(Murmur3.hash64(new byte[] {val})); + } + + public void addBytes(byte[] val) { + add(Murmur3.hash64(val)); + } + + public void addShort(short val) { + SHORT_BUFFER.putShort(0, val); + add(Murmur3.hash64(SHORT_BUFFER.array())); + } + + public void addInt(int val) { + INT_BUFFER.putInt(0, val); + add(Murmur3.hash64(INT_BUFFER.array())); + } + + public void addLong(long val) { + LONG_BUFFER.putLong(0, val); + add(Murmur3.hash64(LONG_BUFFER.array())); + } + + public void addFloat(float val) { + INT_BUFFER.putFloat(0, val); + add(Murmur3.hash64(INT_BUFFER.array())); + } + + public void addDouble(double val) { + LONG_BUFFER.putDouble(0, val); + add(Murmur3.hash64(LONG_BUFFER.array())); + } + + public void addChar(char val) { + SHORT_BUFFER.putChar(0, val); + add(Murmur3.hash64(SHORT_BUFFER.array())); + } + + /** + * Java's default charset will be used for strings. + * @param val + * - input string + */ + public void addString(String val) { + add(Murmur3.hash64(val.getBytes())); + } + + public void addString(String val, Charset charset) { + add(Murmur3.hash64(val.getBytes(charset))); + } + + public void add(long hashcode) { + if (encoding.equals(EncodingType.SPARSE)) { + if (sparseRegister.add(hashcode)) { + invalidateCount = true; + } + + // if size of sparse map excess the threshold convert the sparse map to + // dense register and switch to DENSE encoding + if (sparseRegister.getSize() > encodingSwitchThreshold) { + encoding = EncodingType.DENSE; + denseRegister = sparseToDenseRegister(sparseRegister); + sparseRegister = null; + invalidateCount = true; + } + } else { + if (denseRegister.add(hashcode)) { + invalidateCount = true; + } + } + } + + public long estimateNumDistinctValues() { + // FMSketch treats the ndv of all nulls as 1 but hll treates the ndv as 0. + // In order to get rid of divide by 0 problem, we follow FMSketch + return count() > 0 ? count() : 1; + } + + public long count() { + // compute count only if the register values are updated else return the + // cached count + if (invalidateCount || cachedCount < 0) { + if (encoding.equals(EncodingType.SPARSE)) { + + // if encoding is still SPARSE use linear counting with increase + // accuracy (as we use pPrime bits for register index) + int mPrime = 1 << sparseRegister.getPPrime(); + cachedCount = linearCount(mPrime, mPrime - sparseRegister.getSize()); + } else { + + // for DENSE encoding, use bias table lookup for HLLNoBias algorithm + // else fallback to HLLOriginal algorithm + double sum = denseRegister.getSumInversePow2(); + long numZeros = denseRegister.getNumZeroes(); + + // cardinality estimate from normalized bias corrected harmonic mean on + // the registers + cachedCount = (long) (alphaMM * (1.0 / sum)); + long pow = (long) Math.pow(2, chosenHashBits); + + // when bias correction is enabled + if (noBias) { + cachedCount = cachedCount <= 5 * m ? (cachedCount - estimateBias(cachedCount)) + : cachedCount; + long h = cachedCount; + if (numZeros != 0) { + h = linearCount(m, numZeros); + } + + if (h < getThreshold()) { + cachedCount = h; + } + } else { + // HLL algorithm shows stronger bias for values in (2.5 * m) range. + // To compensate for this short range bias, linear counting is used + // for values before this short range. The original paper also says + // similar bias is seen for long range values due to hash collisions + // in range >1/30*(2^32). For the default case, we do not have to + // worry about this long range bias as the paper used 32-bit hashing + // and we use 64-bit hashing as default. 2^64 values are too high to + // observe long range bias (hash collisions). + if (cachedCount <= 2.5 * m) { + + // for short range use linear counting + if (numZeros != 0) { + cachedCount = linearCount(m, numZeros); + } + } else if (chosenHashBits < 64 && cachedCount > (0.033333 * pow)) { + + // long range bias for 32-bit hashcodes + if (cachedCount > (1 / 30) * pow) { + cachedCount = (long) (-pow * Math.log(1.0 - (double) cachedCount / (double) pow)); + } + } + } + } + invalidateCount = false; + } + + return cachedCount; + } + + private long getThreshold() { + return (long) (HLLConstants.thresholdData[p - 4] + 0.5); + } + + /** + * Estimate bias from lookup table + * @param count + * - cardinality before bias correction + * @return cardinality after bias correction + */ + private long estimateBias(long count) { + double[] rawEstForP = HLLConstants.rawEstimateData[p - 4]; + + // compute distance and store it in sorted map + TreeMap<Double,Integer> estIndexMap = new TreeMap<>(); + double distance = 0; + for (int i = 0; i < rawEstForP.length; i++) { + distance = Math.pow(count - rawEstForP[i], 2); + estIndexMap.put(distance, i); + } + + // take top-k closest neighbors and compute the bias corrected cardinality + long result = 0; + double[] biasForP = HLLConstants.biasData[p - 4]; + double biasSum = 0; + int kNeighbors = HLLConstants.K_NEAREST_NEIGHBOR; + for (Map.Entry<Double, Integer> entry : estIndexMap.entrySet()) { + biasSum += biasForP[entry.getValue()]; + kNeighbors--; + if (kNeighbors <= 0) { + break; + } + } + + // 0.5 added for rounding off + result = (long) ((biasSum / HLLConstants.K_NEAREST_NEIGHBOR) + 0.5); + return result; + } + + public void setCount(long count) { + this.cachedCount = count; + this.invalidateCount = true; + } + + private long linearCount(int mVal, long numZeros) { + return (long) (Math.round(mVal * Math.log(mVal / ((double) numZeros)))); + } + + // refer paper + public double getStandardError() { + return 1.04 / Math.sqrt(m); + } + + public HLLDenseRegister getHLLDenseRegister() { + return denseRegister; + } + + public HLLSparseRegister getHLLSparseRegister() { + return sparseRegister; + } + + /** + * Reconstruct sparse map from serialized integer list + * @param reg + * - uncompressed and delta decoded integer list + */ + public void setHLLSparseRegister(int[] reg) { + for (int i : reg) { + int key = i >>> HLLConstants.Q_PRIME_VALUE; + byte value = (byte) (i & 0x3f); + sparseRegister.set(key, value); + } + } + + /** + * Reconstruct dense registers from byte array + * @param reg + * - unpacked byte array + */ + public void setHLLDenseRegister(byte[] reg) { + int i = 0; + for (byte b : reg) { + denseRegister.set(i, b); + i++; + } + } + + /** + * Merge the specified hyperloglog to the current one. Encoding switches + * automatically after merge if the encoding switch threshold is exceeded. + * @param hll + * - hyperloglog to be merged + * @throws IllegalArgumentException + */ + public void merge(HyperLogLog hll) { + if (p != hll.p || chosenHashBits != hll.chosenHashBits) { + throw new IllegalArgumentException( + "HyperLogLog cannot be merged as either p or hashbits are different. Current: " + + toString() + " Provided: " + hll.toString()); + } + + EncodingType otherEncoding = hll.getEncoding(); + + if (encoding.equals(EncodingType.SPARSE) && otherEncoding.equals(EncodingType.SPARSE)) { + sparseRegister.merge(hll.getHLLSparseRegister()); + // if after merge the sparse switching threshold is exceeded then change + // to dense encoding + if (sparseRegister.getSize() > encodingSwitchThreshold) { + encoding = EncodingType.DENSE; + denseRegister = sparseToDenseRegister(sparseRegister); + sparseRegister = null; + } + } else if (encoding.equals(EncodingType.DENSE) && otherEncoding.equals(EncodingType.DENSE)) { + denseRegister.merge(hll.getHLLDenseRegister()); + } else if (encoding.equals(EncodingType.SPARSE) && otherEncoding.equals(EncodingType.DENSE)) { + denseRegister = sparseToDenseRegister(sparseRegister); + denseRegister.merge(hll.getHLLDenseRegister()); + sparseRegister = null; + encoding = EncodingType.DENSE; + } else if (encoding.equals(EncodingType.DENSE) && otherEncoding.equals(EncodingType.SPARSE)) { + HLLDenseRegister otherDenseRegister = sparseToDenseRegister(hll.getHLLSparseRegister()); + denseRegister.merge(otherDenseRegister); + } + + invalidateCount = true; + } + + /** + * Converts sparse to dense hll register + * @param sparseRegister + * - sparse register to be converted + * @return converted dense register + */ + private HLLDenseRegister sparseToDenseRegister(HLLSparseRegister sparseRegister) { + if (sparseRegister == null) { + return null; + } + int p = sparseRegister.getP(); + int pMask = (1 << p) - 1; + HLLDenseRegister result = new HLLDenseRegister(p, bitPacking); + for (Map.Entry<Integer, Byte> entry : sparseRegister.getSparseMap().entrySet()) { + int key = entry.getKey(); + int idx = key & pMask; + result.set(idx, entry.getValue()); + } + return result; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Encoding: "); + sb.append(encoding); + sb.append(", p: "); + sb.append(p); + sb.append(", estimatedCardinality: "); + sb.append(estimateNumDistinctValues()); + return sb.toString(); + } + + public String toStringExtended() { + if (encoding.equals(EncodingType.DENSE)) { + return toString() + ", " + denseRegister.toExtendedString(); + } else if (encoding.equals(EncodingType.SPARSE)) { + return toString() + ", " + sparseRegister.toExtendedString(); + } + + return toString(); + } + + public int getNumRegisterIndexBits() { + return p; + } + + public EncodingType getEncoding() { + return encoding; + } + + public void setEncoding(EncodingType encoding) { + this.encoding = encoding; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof HyperLogLog)) { + return false; + } + + HyperLogLog other = (HyperLogLog) obj; + long count = estimateNumDistinctValues(); + long otherCount = other.estimateNumDistinctValues(); + boolean result = p == other.p && chosenHashBits == other.chosenHashBits + && encoding.equals(other.encoding) && count == otherCount; + if (encoding.equals(EncodingType.DENSE)) { + result = result && denseRegister.equals(other.getHLLDenseRegister()); + } + + if (encoding.equals(EncodingType.SPARSE)) { + result = result && sparseRegister.equals(other.getHLLSparseRegister()); + } + return result; + } + + @Override + public int hashCode() { + int hashcode = 0; + hashcode += 31 * p; + hashcode += 31 * chosenHashBits; + hashcode += encoding.hashCode(); + hashcode += 31 * estimateNumDistinctValues(); + if (encoding.equals(EncodingType.DENSE)) { + hashcode += 31 * denseRegister.hashCode(); + } + + if (encoding.equals(EncodingType.SPARSE)) { + hashcode += 31 * sparseRegister.hashCode(); + } + return hashcode; + } + + @Override + public void reset() { + } + + @Override + public byte[] serialize() { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + // write bytes to bos ... + try { + HyperLogLogUtils.serializeHLL(bos, this); + byte[] result = bos.toByteArray(); + bos.close(); + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public NumDistinctValueEstimator deserialize(byte[] buf) { + InputStream is = new ByteArrayInputStream(buf); + try { + HyperLogLog result = HyperLogLogUtils.deserializeHLL(is); + is.close(); + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void addToEstimator(long v) { + addLong(v); + } + + @Override + public void addToEstimator(String s) { + addString(s); + } + + @Override + public void addToEstimator(double d) { + addDouble(d); + } + + @Override + public void addToEstimator(HiveDecimal decimal) { + addDouble(decimal.doubleValue()); + } + + @Override + public void mergeEstimators(NumDistinctValueEstimator o) { + merge((HyperLogLog) o); + } + + @Override + public int lengthFor(JavaDataModel model) { + // 5 is the head, 1<<p means the number of bytes for register + return (5 + (1 << p)); + } + + @Override + public boolean canMerge(NumDistinctValueEstimator o) { + return o instanceof HyperLogLog; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java new file mode 100644 index 0000000..4e6510b --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.ndv.hll; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType; + +/** + * HyperLogLog serialization utilities. + */ +public class HyperLogLogUtils { + + public static final byte[] MAGIC = new byte[] { 'H', 'L', 'L' }; + + /** + * HyperLogLog is serialized using the following format + * + * <pre> + * |-4 byte-|------varlong----|varint (optional)|----------| + * --------------------------------------------------------- + * | header | estimated-count | register-length | register | + * --------------------------------------------------------- + * + * <b>4 byte header</b> is encoded like below + * 3 bytes - HLL magic string to identify serialized stream + * 4 bits - p (number of bits to be used as register index) + * 1 - spare bit (not used) + * 3 bits - encoding (000 - sparse, 001..110 - n bit packing, 111 - no bit packing) + * + * Followed by header are 3 fields that are required for reconstruction + * of hyperloglog + * Estimated count - variable length long to store last computed estimated count. + * This is just for quick lookup without deserializing registers + * Register length - number of entries in the register (required only for + * for sparse representation. For bit-packing, the register + * length can be found from p) + * </pre> + * @param out + * - output stream to write to + * @param hll + * - hyperloglog that needs to be serialized + * @throws IOException + */ + public static void serializeHLL(OutputStream out, HyperLogLog hll) throws IOException { + + // write header + out.write(MAGIC); + int fourthByte = 0; + int p = hll.getNumRegisterIndexBits(); + fourthByte = (p & 0xff) << 4; + + int bitWidth = 0; + EncodingType enc = hll.getEncoding(); + + // determine bit width for bitpacking and encode it in header + if (enc.equals(EncodingType.DENSE)) { + int lzr = hll.getHLLDenseRegister().getMaxRegisterValue(); + bitWidth = getBitWidth(lzr); + + // the max value of number of zeroes for 64 bit hash can be encoded using + // only 6 bits. So we will disable bit packing for any values >6 + if (bitWidth > 6) { + fourthByte |= 7; + bitWidth = 8; + } else { + fourthByte |= (bitWidth & 7); + } + } + + // write fourth byte of header + out.write(fourthByte); + + // write estimated count + long estCount = hll.estimateNumDistinctValues(); + writeVulong(out, estCount); + + // serialize dense/sparse registers. Dense registers are bitpacked whereas + // sparse registers are delta and variable length encoded + if (enc.equals(EncodingType.DENSE)) { + byte[] register = hll.getHLLDenseRegister().getRegister(); + bitpackHLLRegister(out, register, bitWidth); + } else if (enc.equals(EncodingType.SPARSE)) { + TreeMap<Integer, Byte> sparseMap = hll.getHLLSparseRegister().getSparseMap(); + + // write the number of elements in sparse map (required for + // reconstruction) + writeVulong(out, sparseMap.size()); + + // compute deltas and write the values as varints + int prev = 0; + for (Map.Entry<Integer, Byte> entry : sparseMap.entrySet()) { + if (prev == 0) { + prev = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | entry.getValue(); + writeVulong(out, prev); + } else { + int curr = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | entry.getValue(); + int delta = curr - prev; + writeVulong(out, delta); + prev = curr; + } + } + } + } + + /** + * Refer serializeHLL() for format of serialization. This funtions + * deserializes the serialized hyperloglogs + * @param in + * - input stream + * @return deserialized hyperloglog + * @throws IOException + */ + public static HyperLogLog deserializeHLL(InputStream in) throws IOException { + checkMagicString(in); + int fourthByte = in.read() & 0xff; + int p = fourthByte >>> 4; + + // read type of encoding + int enc = fourthByte & 7; + EncodingType encoding = null; + int bitSize = 0; + if (enc == 0) { + encoding = EncodingType.SPARSE; + } else if (enc > 0 && enc < 7) { + bitSize = enc; + encoding = EncodingType.DENSE; + } else { + // bit packing disabled + bitSize = 8; + encoding = EncodingType.DENSE; + } + + // estimated count + long estCount = readVulong(in); + + HyperLogLog result = null; + if (encoding.equals(EncodingType.SPARSE)) { + result = HyperLogLog.builder().setNumRegisterIndexBits(p) + .setEncoding(EncodingType.SPARSE).build(); + int numRegisterEntries = (int) readVulong(in); + int[] reg = new int[numRegisterEntries]; + int prev = 0; + + // reconstruct the sparse map from delta encoded and varint input stream + if (numRegisterEntries > 0) { + prev = (int) readVulong(in); + reg[0] = prev; + } + int delta = 0; + int curr = 0; + for (int i = 1; i < numRegisterEntries; i++) { + delta = (int) readVulong(in); + curr = prev + delta; + reg[i] = curr; + prev = curr; + } + result.setHLLSparseRegister(reg); + } else { + + // explicitly disable bit packing + if (bitSize == 8) { + result = HyperLogLog.builder().setNumRegisterIndexBits(p) + .setEncoding(EncodingType.DENSE).enableBitPacking(false).build(); + } else { + result = HyperLogLog.builder().setNumRegisterIndexBits(p) + .setEncoding(EncodingType.DENSE).enableBitPacking(true).build(); + } + int m = 1 << p; + byte[] register = unpackHLLRegister(in, m, bitSize); + result.setHLLDenseRegister(register); + } + + result.setCount(estCount); + + return result; + } + + private static void bitpackHLLRegister(OutputStream out, byte[] register, int bitWidth) + throws IOException { + int bitsLeft = 8; + byte current = 0; + + if (bitWidth == 8) { + fastPathWrite(out, register); + return; + } + + // write the blob + for (byte value : register) { + int bitsToWrite = bitWidth; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1 << bitsToWrite) - 1; + out.write(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + out.write(current); + current = 0; + bitsLeft = 8; + } + } + + out.flush(); + } + + private static void fastPathWrite(OutputStream out, byte[] register) throws IOException { + for (byte b : register) { + out.write(b); + } + } + + /** + * Unpack the bitpacked HyperLogLog register. + * @param in + * - input stream + * @param length + * - serialized length + * @return unpacked HLL register + * @throws IOException + */ + private static byte[] unpackHLLRegister(InputStream in, int length, int bitSize) + throws IOException { + int mask = (1 << bitSize) - 1; + int bitsLeft = 8; + + if (bitSize == 8) { + return fastPathRead(in, length); + } + + byte current = (byte) (0xff & in.read()); + + byte[] output = new byte[length]; + for (int i = 0; i < output.length; i++) { + byte result = 0; + int bitsLeftToRead = bitSize; + while (bitsLeftToRead > bitsLeft) { + result <<= bitsLeft; + result |= current & ((1 << bitsLeft) - 1); + bitsLeftToRead -= bitsLeft; + current = (byte) (0xff & in.read()); + bitsLeft = 8; + } + if (bitsLeftToRead > 0) { + result <<= bitsLeftToRead; + bitsLeft -= bitsLeftToRead; + result |= (current >>> bitsLeft) & ((1 << bitsLeftToRead) - 1); + } + output[i] = (byte) (result & mask); + } + return output; + } + + private static byte[] fastPathRead(InputStream in, int length) throws IOException { + byte[] result = new byte[length]; + for (int i = 0; i < length; i++) { + result[i] = (byte) in.read(); + } + return result; + } + + /** + * Get estimated cardinality without deserializing HLL + * @param in + * - serialized HLL + * @return - cardinality + * @throws IOException + */ + public static long getEstimatedCountFromSerializedHLL(InputStream in) throws IOException { + checkMagicString(in); + in.read(); + return readVulong(in); + } + + /** + * Check if the specified input stream is actually a HLL stream + * @param in + * - input stream + * @throws IOException + */ + private static void checkMagicString(InputStream in) throws IOException { + byte[] magic = new byte[3]; + magic[0] = (byte) in.read(); + magic[1] = (byte) in.read(); + magic[2] = (byte) in.read(); + + if (!Arrays.equals(magic, MAGIC)) { + throw new IllegalArgumentException("The input stream is not a HyperLogLog stream."); + } + } + + /** + * Minimum bits required to encode the specified value + * @param val + * - input value + * @return + */ + private static int getBitWidth(int val) { + int count = 0; + while (val != 0) { + count++; + val = (byte) (val >>> 1); + } + return count; + } + + /** + * Return relative error between actual and estimated cardinality + * @param actualCount + * - actual count + * @param estimatedCount + * - estimated count + * @return relative error + */ + public static float getRelativeError(long actualCount, long estimatedCount) { + float err = (1.0f - ((float) estimatedCount / (float) actualCount)) * 100.0f; + return err; + } + + /** + * Write variable length encoded longs to output stream + * @param output + * - out stream + * @param value + * - long + * @throws IOException + */ + private static void writeVulong(OutputStream output, long value) throws IOException { + while (true) { + if ((value & ~0x7f) == 0) { + output.write((byte) value); + return; + } else { + output.write((byte) (0x80 | (value & 0x7f))); + value >>>= 7; + } + } + } + + /** + * Read variable length encoded longs from input stream + * @param in + * - input stream + * @return decoded long value + * @throws IOException + */ + private static long readVulong(InputStream in) throws IOException { + long result = 0; + long b; + int offset = 0; + do { + b = in.read(); + if (b == -1) { + throw new EOFException("Reading Vulong past EOF"); + } + result |= (0x7f & b) << offset; + offset += 7; + } while (b >= 0x80); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java new file mode 100644 index 0000000..2e00005 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Deadline.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.api.MetaException; + +/** + * This class is used to monitor long running methods in a thread. + * It is recommended to use it as a ThreadLocal variable. + */ +public class Deadline { + private static final Logger LOG = LoggerFactory.getLogger(Deadline.class.getName()); + + /** + * its value is init from conf, and could be reset from client. + */ + private long timeoutNanos; + + /** + * it is reset before executing a method + */ + private long startTime = NO_DEADLINE; + + /** + * The name of public methods in HMSHandler + */ + private String method; + + private Deadline(long timeoutMs) { + this.timeoutNanos = timeoutMs * 1000000L; + } + + /** + * Deadline object per thread. + */ + private static final ThreadLocal<Deadline> DEADLINE_THREAD_LOCAL = new ThreadLocal<Deadline>() { + @Override + protected Deadline initialValue() { + return null; + } + }; + + private static void setCurrentDeadline(Deadline deadline) { + DEADLINE_THREAD_LOCAL.set(deadline); + } + + static Deadline getCurrentDeadline() { + return DEADLINE_THREAD_LOCAL.get(); + } + + private static void removeCurrentDeadline() { + DEADLINE_THREAD_LOCAL.remove(); + } + + /** + * register a Deadline threadlocal object to current thread. + * @param timeout + */ + public static void registerIfNot(long timeout) { + if (getCurrentDeadline() == null) { + setCurrentDeadline(new Deadline(timeout)); + } + } + + /** + * reset the timeout value of this timer. + * @param timeoutMs + */ + public static void resetTimeout(long timeoutMs) throws MetaException { + if (timeoutMs <= 0) { + throw MetaStoreUtils.newMetaException(new DeadlineException("The reset timeout value should be " + + "larger than 0: " + timeoutMs)); + } + Deadline deadline = getCurrentDeadline(); + if (deadline != null) { + deadline.timeoutNanos = timeoutMs * 1000000L; + } else { + throw MetaStoreUtils.newMetaException(new DeadlineException("The threadlocal Deadline is null," + + " please register it first.")); + } + } + + /** + * start the timer before a method is invoked. + * @param method method to be invoked + */ + public static boolean startTimer(String method) throws MetaException { + Deadline deadline = getCurrentDeadline(); + if (deadline == null) { + throw MetaStoreUtils.newMetaException(new DeadlineException("The threadlocal Deadline is null," + + " please register it first.")); + } + if (deadline.startTime != NO_DEADLINE) return false; + deadline.method = method; + do { + deadline.startTime = System.nanoTime(); + } while (deadline.startTime == NO_DEADLINE); + return true; + } + + /** + * end the time after a method is done. + */ + public static void stopTimer() throws MetaException { + Deadline deadline = getCurrentDeadline(); + if (deadline != null) { + deadline.startTime = NO_DEADLINE; + deadline.method = null; + } else { + throw MetaStoreUtils.newMetaException(new DeadlineException("The threadlocal Deadline is null," + + " please register it first.")); + } + } + + /** + * remove the registered Deadline threadlocal object from current thread. + */ + public static void clear() { + removeCurrentDeadline(); + } + + /** + * Check whether the long running method timeout. + * @throws MetaException when the method timeout + */ + public static void checkTimeout() throws MetaException { + Deadline deadline = getCurrentDeadline(); + if (deadline != null) { + deadline.check(); + } else { + throw MetaStoreUtils.newMetaException(new DeadlineException("The threadlocal Deadline is null," + + " please register it first.")); + } + } + + private static final long NO_DEADLINE = Long.MIN_VALUE; + + private void check() throws MetaException{ + try { + if (startTime == NO_DEADLINE) { + throw new DeadlineException("Should execute startTimer() method before " + + "checkTimeout. Error happens in method: " + method); + } + long elapsedTime = System.nanoTime() - startTime; + if (elapsedTime > timeoutNanos) { + throw new DeadlineException("Timeout when executing method: " + method + "; " + + (elapsedTime / 1000000L) + "ms exceeds " + (timeoutNanos / 1000000L) + "ms"); + } + } catch (DeadlineException e) { + throw MetaStoreUtils.newMetaException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java new file mode 100644 index 0000000..bfff89d --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/DeadlineException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +/** + * Thrown when a long running method timeout is checked. + */ +public class DeadlineException extends Exception { + + public DeadlineException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java new file mode 100644 index 0000000..4c14ab0 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; + +/** + * The base implementation of a file metadata handler for a specific file type. + * There are currently two classes for each file type (of 1), this one, which is very simple due + * to the fact that it just calls the proxy class for most calls; and the proxy class, that + * contains the actual implementation that depends on some stuff in QL (for ORC). + */ +public abstract class FileMetadataHandler { + protected static final Log LOG = LogFactory.getLog(FileMetadataHandler.class); + + private Configuration conf; + private PartitionExpressionProxy expressionProxy; + private FileFormatProxy fileFormatProxy; + private MetadataStore store; + + /** + * Same as RawStore.getFileMetadataByExpr. + */ + public abstract void getFileMetadataByExpr(List<Long> fileIds, byte[] expr, + ByteBuffer[] metadatas, ByteBuffer[] results, boolean[] eliminated) throws IOException; + + protected abstract FileMetadataExprType getType(); + + protected PartitionExpressionProxy getExpressionProxy() { + return expressionProxy; + } + + protected FileFormatProxy getFileFormatProxy() { + return fileFormatProxy; + } + + protected MetadataStore getStore() { + return store; + } + + /** + * Configures the handler. Called once before use. + * @param conf Config. + * @param expressionProxy Expression proxy to access ql stuff. + * @param store Storage interface to manipulate the metadata. + */ + public void configure( + Configuration conf, PartitionExpressionProxy expressionProxy, MetadataStore store) { + this.conf = conf; + this.expressionProxy = expressionProxy; + this.store = store; + this.fileFormatProxy = expressionProxy.getFileFormatProxy(getType()); + } + + /** + * Caches the file metadata for a particular file. + * @param fileId File id. + * @param fs The filesystem of the file. + * @param path Path to the file. + */ + public void cacheFileMetadata(long fileId, FileSystem fs, Path path) + throws IOException, InterruptedException { + // ORC is in ql, so we cannot do anything here. For now, all the logic is in the proxy. + ByteBuffer[] cols = fileFormatProxy.getAddedColumnsToCache(); + ByteBuffer[] vals = (cols == null) ? null : new ByteBuffer[cols.length]; + ByteBuffer metadata = fileFormatProxy.getMetadataToCache(fs, path, vals); + LOG.info("Caching file metadata for " + path + ", size " + metadata.remaining()); + store.storeFileMetadata(fileId, metadata, cols, vals); + } + + /** + * @return the added column names to be cached in metastore with the metadata for this type. + */ + public ByteBuffer[] createAddedCols() { + return fileFormatProxy.getAddedColumnsToCache(); + } + + /** + * @return the values for the added columns returned by createAddedCols for respective metadatas. + */ + public ByteBuffer[][] createAddedColVals(List<ByteBuffer> metadata) { + return fileFormatProxy.getAddedValuesToCache(metadata); + } +}
