HIVE-17733 Move RawStore to standalone metastore. This closes #258 github PR. (Alan Gates, reviewed by Sergey Shelukhin, Vihang Karajgaonkar, and Zoltan Haindrich)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/133d3c47 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/133d3c47 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/133d3c47 Branch: refs/heads/master Commit: 133d3c4733aca4fad9fd327bb14195aa5a78c664 Parents: ea89de7 Author: Alan Gates <[email protected]> Authored: Sat Oct 14 07:41:18 2017 -0700 Committer: Alan Gates <[email protected]> Committed: Sat Oct 14 07:41:18 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/StatsSetupConst.java | 314 - .../common/ndv/NumDistinctValueEstimator.java | 51 - .../ndv/NumDistinctValueEstimatorFactory.java | 74 - .../hadoop/hive/common/ndv/fm/FMSketch.java | 359 - .../hive/common/ndv/fm/FMSketchUtils.java | 132 - .../hive/common/ndv/hll/HLLConstants.java | 929 -- .../hive/common/ndv/hll/HLLDenseRegister.java | 192 - .../hadoop/hive/common/ndv/hll/HLLRegister.java | 50 - .../hive/common/ndv/hll/HLLSparseRegister.java | 260 - .../hadoop/hive/common/ndv/hll/HyperLogLog.java | 634 -- .../hive/common/ndv/hll/HyperLogLogUtils.java | 392 - .../hadoop/hive/common/TestStatsSetupConst.java | 110 - .../ndv/fm/TestFMSketchSerialization.java | 98 - .../hive/common/ndv/hll/TestHLLNoBias.java | 114 - .../common/ndv/hll/TestHLLSerialization.java | 267 - .../hive/common/ndv/hll/TestHyperLogLog.java | 227 - .../common/ndv/hll/TestHyperLogLogDense.java | 82 - .../common/ndv/hll/TestHyperLogLogSparse.java | 81 - .../common/ndv/hll/TestSparseEncodeHash.java | 56 - metastore/pom.xml | 14 - .../apache/hadoop/hive/metastore/Deadline.java | 171 - .../hive/metastore/DeadlineException.java | 29 - .../hive/metastore/FileMetadataHandler.java | 110 - .../hadoop/hive/metastore/HiveMetaStore.java | 11 - .../hive/metastore/MetaStoreDirectSql.java | 2222 ----- .../hadoop/hive/metastore/MetaStoreUtils.java | 183 +- .../hadoop/hive/metastore/ObjectStore.java | 9303 ----------------- .../hive/metastore/PartFilterExprUtil.java | 149 - .../apache/hadoop/hive/metastore/RawStore.java | 747 -- .../hadoop/hive/metastore/RawStoreProxy.java | 118 - .../hive/metastore/StatObjectConverter.java | 888 -- .../hive/metastore/cache/ByteArrayWrapper.java | 45 - .../hadoop/hive/metastore/cache/CacheUtils.java | 144 - .../hive/metastore/cache/CachedStore.java | 2246 ----- .../hive/metastore/cache/SharedCache.java | 559 -- .../aggr/BinaryColumnStatsAggregator.java | 64 - .../aggr/BooleanColumnStatsAggregator.java | 65 - .../columnstats/aggr/ColumnStatsAggregator.java | 33 - .../aggr/ColumnStatsAggregatorFactory.java | 113 - .../aggr/DateColumnStatsAggregator.java | 362 - .../aggr/DecimalColumnStatsAggregator.java | 375 - .../aggr/DoubleColumnStatsAggregator.java | 349 - .../aggr/IExtrapolatePartStatus.java | 47 - .../aggr/LongColumnStatsAggregator.java | 348 - .../aggr/StringColumnStatsAggregator.java | 305 - .../cache/DateColumnStatsDataInspector.java | 124 - .../cache/DecimalColumnStatsDataInspector.java | 124 - .../cache/DoubleColumnStatsDataInspector.java | 124 - .../cache/LongColumnStatsDataInspector.java | 124 - .../cache/StringColumnStatsDataInspector.java | 125 - .../merge/BinaryColumnStatsMerger.java | 35 - .../merge/BooleanColumnStatsMerger.java | 35 - .../columnstats/merge/ColumnStatsMerger.java | 31 - .../merge/ColumnStatsMergerFactory.java | 120 - .../merge/DateColumnStatsMerger.java | 59 - .../merge/DecimalColumnStatsMerger.java | 61 - .../merge/DoubleColumnStatsMerger.java | 54 - .../merge/LongColumnStatsMerger.java | 54 - .../merge/StringColumnStatsMerger.java | 54 - .../hive/metastore/hbase/MetadataStore.java | 52 - .../hive/metastore/parser/ExpressionTree.java | 605 -- .../hadoop/hive/metastore/parser/Filter.g | 484 - .../hive/metastore/parser/package-info.java | 23 - .../hadoop/hive/metastore/TestDeadline.java | 127 - .../hadoop/hive/metastore/TestObjectStore.java | 583 -- .../hadoop/hive/metastore/TestObjectStore2.java | 229 + .../hive/metastore/TestRawStoreProxy.java | 64 - .../hive/metastore/cache/TestCachedStore.java | 901 -- .../hive/ql/parse/BaseSemanticAnalyzer.java | 4 +- .../hive/ql/io/orc/TestOrcSplitElimination.java | 2 +- standalone-metastore/pom.xml | 35 +- .../hadoop/hive/common/StatsSetupConst.java | 315 + .../common/ndv/NumDistinctValueEstimator.java | 51 + .../ndv/NumDistinctValueEstimatorFactory.java | 74 + .../hadoop/hive/common/ndv/fm/FMSketch.java | 359 + .../hive/common/ndv/fm/FMSketchUtils.java | 132 + .../hive/common/ndv/hll/HLLConstants.java | 929 ++ .../hive/common/ndv/hll/HLLDenseRegister.java | 192 + .../hadoop/hive/common/ndv/hll/HLLRegister.java | 50 + .../hive/common/ndv/hll/HLLSparseRegister.java | 260 + .../hadoop/hive/common/ndv/hll/HyperLogLog.java | 634 ++ .../hive/common/ndv/hll/HyperLogLogUtils.java | 392 + .../apache/hadoop/hive/metastore/Deadline.java | 172 + .../hive/metastore/DeadlineException.java | 29 + .../hive/metastore/FileMetadataHandler.java | 109 + .../hive/metastore/MetaStoreDirectSql.java | 2223 +++++ .../hadoop/hive/metastore/MetadataStore.java | 52 + .../hadoop/hive/metastore/ObjectStore.java | 9313 ++++++++++++++++++ .../hive/metastore/PartFilterExprUtil.java | 157 + .../apache/hadoop/hive/metastore/RawStore.java | 747 ++ .../hadoop/hive/metastore/RawStoreProxy.java | 114 + .../hive/metastore/StatObjectConverter.java | 888 ++ .../hive/metastore/cache/ByteArrayWrapper.java | 45 + .../hadoop/hive/metastore/cache/CacheUtils.java | 143 + .../hive/metastore/cache/CachedStore.java | 2242 +++++ .../hive/metastore/cache/SharedCache.java | 557 ++ .../aggr/BinaryColumnStatsAggregator.java | 64 + .../aggr/BooleanColumnStatsAggregator.java | 65 + .../columnstats/aggr/ColumnStatsAggregator.java | 33 + .../aggr/ColumnStatsAggregatorFactory.java | 113 + .../aggr/DateColumnStatsAggregator.java | 362 + .../aggr/DecimalColumnStatsAggregator.java | 375 + .../aggr/DoubleColumnStatsAggregator.java | 349 + .../aggr/IExtrapolatePartStatus.java | 47 + .../aggr/LongColumnStatsAggregator.java | 348 + .../aggr/StringColumnStatsAggregator.java | 305 + .../cache/DateColumnStatsDataInspector.java | 124 + .../cache/DecimalColumnStatsDataInspector.java | 124 + .../cache/DoubleColumnStatsDataInspector.java | 124 + .../cache/LongColumnStatsDataInspector.java | 124 + .../cache/StringColumnStatsDataInspector.java | 125 + .../merge/BinaryColumnStatsMerger.java | 35 + .../merge/BooleanColumnStatsMerger.java | 35 + .../columnstats/merge/ColumnStatsMerger.java | 31 + .../merge/ColumnStatsMergerFactory.java | 120 + .../merge/DateColumnStatsMerger.java | 59 + .../merge/DecimalColumnStatsMerger.java | 61 + .../merge/DoubleColumnStatsMerger.java | 54 + .../merge/LongColumnStatsMerger.java | 54 + .../merge/StringColumnStatsMerger.java | 54 + .../hive/metastore/conf/MetastoreConf.java | 23 +- .../hive/metastore/parser/ExpressionTree.java | 605 ++ .../hadoop/hive/metastore/parser/Filter.g | 484 + .../hive/metastore/parser/package-info.java | 23 + .../txn/AcidOpenTxnsCounterService.java | 3 +- .../hadoop/hive/metastore/utils/JavaUtils.java | 33 + .../hive/metastore/utils/MetaStoreUtils.java | 306 + .../hadoop/hive/metastore/utils/ObjectPair.java | 86 + .../hadoop/hive/common/TestStatsSetupConst.java | 110 + .../ndv/fm/TestFMSketchSerialization.java | 98 + .../hive/common/ndv/hll/TestHLLNoBias.java | 114 + .../common/ndv/hll/TestHLLSerialization.java | 267 + .../hive/common/ndv/hll/TestHyperLogLog.java | 227 + .../common/ndv/hll/TestHyperLogLogDense.java | 82 + .../common/ndv/hll/TestHyperLogLogSparse.java | 81 + .../common/ndv/hll/TestSparseEncodeHash.java | 56 + .../hadoop/hive/metastore/TestDeadline.java | 127 + .../hadoop/hive/metastore/TestObjectStore.java | 462 + .../hive/metastore/TestRawStoreProxy.java | 64 + .../hive/metastore/cache/TestCachedStore.java | 902 ++ 140 files changed, 27775 insertions(+), 27397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java deleted file mode 100644 index 7c27d07..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; - - -/** - * A class that defines the constant strings used by the statistics implementation. - */ - -public class StatsSetupConst { - - protected static final Logger LOG = LoggerFactory.getLogger(StatsSetupConst.class.getName()); - - public enum StatDB { - fs { - @Override - public String getPublisher(Configuration conf) { - return "org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher"; - } - - @Override - public String getAggregator(Configuration conf) { - return "org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator"; - } - }, - custom { - @Override - public String getPublisher(Configuration conf) { - return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER); } - @Override - public String getAggregator(Configuration conf) { - return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR); } - }; - public abstract String getPublisher(Configuration conf); - public abstract String getAggregator(Configuration conf); - } - - // statistics stored in metastore - /** - * The name of the statistic Num Files to be published or gathered. - */ - public static final String NUM_FILES = "numFiles"; - - /** - * The name of the statistic Num Partitions to be published or gathered. - */ - public static final String NUM_PARTITIONS = "numPartitions"; - - /** - * The name of the statistic Total Size to be published or gathered. - */ - public static final String TOTAL_SIZE = "totalSize"; - - /** - * The name of the statistic Row Count to be published or gathered. - */ - public static final String ROW_COUNT = "numRows"; - - public static final String RUN_TIME_ROW_COUNT = "runTimeNumRows"; - - /** - * The name of the statistic Raw Data Size to be published or gathered. - */ - public static final String RAW_DATA_SIZE = "rawDataSize"; - - /** - * Temp dir for writing stats from tasks. - */ - public static final String STATS_TMP_LOC = "hive.stats.tmp.loc"; - - public static final String STATS_FILE_PREFIX = "tmpstats-"; - /** - * List of all supported statistics - */ - public static final String[] supportedStats = {NUM_FILES,ROW_COUNT,TOTAL_SIZE,RAW_DATA_SIZE}; - - /** - * List of all statistics that need to be collected during query execution. These are - * statistics that inherently require a scan of the data. - */ - public static final String[] statsRequireCompute = new String[] {ROW_COUNT,RAW_DATA_SIZE}; - - /** - * List of statistics that can be collected quickly without requiring a scan of the data. - */ - public static final String[] fastStats = new String[] {NUM_FILES,TOTAL_SIZE}; - - // This string constant is used to indicate to AlterHandler that - // alterPartition/alterTable is happening via statsTask or via user. - public static final String STATS_GENERATED = "STATS_GENERATED"; - - public static final String TASK = "TASK"; - - public static final String USER = "USER"; - - // This string constant is used by AlterHandler to figure out that it should not attempt to - // update stats. It is set by any client-side task which wishes to signal that no stats - // update should take place, such as with replication. - public static final String DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS"; - - //This string constant will be persisted in metastore to indicate whether corresponding - //table or partition's statistics and table or partition's column statistics are accurate or not. - public static final String COLUMN_STATS_ACCURATE = "COLUMN_STATS_ACCURATE"; - - public static final String COLUMN_STATS = "COLUMN_STATS"; - - public static final String BASIC_STATS = "BASIC_STATS"; - - public static final String CASCADE = "CASCADE"; - - public static final String TRUE = "true"; - - public static final String FALSE = "false"; - - // The parameter keys for the table statistics. Those keys are excluded from 'show create table' command output. - public static final String[] TABLE_PARAMS_STATS_KEYS = new String[] { - COLUMN_STATS_ACCURATE, NUM_FILES, TOTAL_SIZE,ROW_COUNT, RAW_DATA_SIZE, NUM_PARTITIONS}; - - private static class ColumnStatsAccurate { - private static ObjectReader objectReader; - private static ObjectWriter objectWriter; - - static { - ObjectMapper objectMapper = new ObjectMapper(); - objectReader = objectMapper.readerFor(ColumnStatsAccurate.class); - objectWriter = objectMapper.writerFor(ColumnStatsAccurate.class); - } - - static class BooleanSerializer extends JsonSerializer<Boolean> { - - @Override - public void serialize(Boolean value, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { - jsonGenerator.writeString(value.toString()); - } - } - - static class BooleanDeserializer extends JsonDeserializer<Boolean> { - - public Boolean deserialize(JsonParser jsonParser, - DeserializationContext deserializationContext) - throws IOException, JsonProcessingException { - return Boolean.valueOf(jsonParser.getValueAsString()); - } - } - - @JsonInclude(JsonInclude.Include.NON_DEFAULT) - @JsonSerialize(using = BooleanSerializer.class) - @JsonDeserialize(using = BooleanDeserializer.class) - @JsonProperty(BASIC_STATS) - boolean basicStats; - - @JsonInclude(JsonInclude.Include.NON_EMPTY) - @JsonProperty(COLUMN_STATS) - @JsonSerialize(contentUsing = BooleanSerializer.class) - @JsonDeserialize(contentUsing = BooleanDeserializer.class) - TreeMap<String, Boolean> columnStats = new TreeMap<>(); - - }; - - public static boolean areBasicStatsUptoDate(Map<String, String> params) { - if (params == null) { - return false; - } - ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); - return stats.basicStats; - } - - public static boolean areColumnStatsUptoDate(Map<String, String> params, String colName) { - if (params == null) { - return false; - } - ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); - return stats.columnStats.containsKey(colName); - } - - // It will only throw JSONException when stats.put(BASIC_STATS, TRUE) - // has duplicate key, which is not possible - // note that set basic stats false will wipe out column stats too. - public static void setBasicStatsState(Map<String, String> params, String setting) { - if (setting.equals(FALSE)) { - if (params!=null && params.containsKey(COLUMN_STATS_ACCURATE)) { - params.remove(COLUMN_STATS_ACCURATE); - } - return; - } - if (params == null) { - throw new RuntimeException("params are null...cant set columnstatstate!"); - } - ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); - stats.basicStats = true; - try { - params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); - } catch (JsonProcessingException e) { - throw new RuntimeException("can't serialize column stats", e); - } - } - - public static void setColumnStatsState(Map<String, String> params, List<String> colNames) { - if (params == null) { - throw new RuntimeException("params are null...cant set columnstatstate!"); - } - if (colNames == null) { - return; - } - ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); - - for (String colName : colNames) { - if (!stats.columnStats.containsKey(colName)) { - stats.columnStats.put(colName, true); - } - } - try { - params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); - } catch (JsonProcessingException e) { - LOG.trace(e.getMessage()); - } - } - - public static void clearColumnStatsState(Map<String, String> params) { - if (params == null) { - return; - } - ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); - stats.columnStats.clear(); - - try { - params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); - } catch (JsonProcessingException e) { - LOG.trace(e.getMessage()); - } - } - - public static void removeColumnStatsState(Map<String, String> params, List<String> colNames) { - if (params == null) { - return; - } - try { - ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); - for (String string : colNames) { - stats.columnStats.remove(string); - } - params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); - } catch (JsonProcessingException e) { - LOG.trace(e.getMessage()); - } - } - - public static void setStatsStateForCreateTable(Map<String, String> params, - List<String> cols, String setting) { - if (TRUE.equals(setting)) { - for (String stat : StatsSetupConst.supportedStats) { - params.put(stat, "0"); - } - } - setBasicStatsState(params, setting); - setColumnStatsState(params, cols); - } - - private static ColumnStatsAccurate parseStatsAcc(String statsAcc) { - if (statsAcc == null) { - return new ColumnStatsAccurate(); - } - try { - return ColumnStatsAccurate.objectReader.readValue(statsAcc); - } catch (Exception e) { - ColumnStatsAccurate ret = new ColumnStatsAccurate(); - if (TRUE.equalsIgnoreCase(statsAcc)) { - ret.basicStats = true; - } - return ret; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java b/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java deleted file mode 100644 index ed0db14..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common.ndv; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.util.JavaDataModel; - -public interface NumDistinctValueEstimator { - - static final Logger LOG = LoggerFactory.getLogger(NumDistinctValueEstimator.class.getName()); - - public void reset(); - - public byte[] serialize(); - - public NumDistinctValueEstimator deserialize(byte[] buf); - - public void addToEstimator(long v); - - public void addToEstimator(double d); - - public void addToEstimator(String s); - - public void addToEstimator(HiveDecimal decimal); - - public void mergeEstimators(NumDistinctValueEstimator o); - - public long estimateNumDistinctValues(); - - public int lengthFor(JavaDataModel model); - - public boolean canMerge(NumDistinctValueEstimator o); - -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java b/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java deleted file mode 100644 index ca90759..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.common.ndv; - -import java.io.IOException; -import java.util.Arrays; - -import org.apache.hadoop.hive.common.ndv.fm.FMSketch; -import org.apache.hadoop.hive.common.ndv.fm.FMSketchUtils; -import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog; - -public class NumDistinctValueEstimatorFactory { - - private NumDistinctValueEstimatorFactory() { - } - - private static boolean isFMSketch(byte[] buf) throws IOException { - byte[] magic = new byte[2]; - magic[0] = (byte) buf[0]; - magic[1] = (byte) buf[1]; - return Arrays.equals(magic, FMSketchUtils.MAGIC); - } - - public static NumDistinctValueEstimator getNumDistinctValueEstimator(byte[] buf) { - // Right now we assume only FM and HLL are available. - try { - if (isFMSketch(buf)) { - return FMSketchUtils.deserializeFM(buf); - } else { - return HyperLogLog.builder().build().deserialize(buf); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static NumDistinctValueEstimator getEmptyNumDistinctValueEstimator( - NumDistinctValueEstimator n) { - if (n instanceof FMSketch) { - return new FMSketch(((FMSketch) n).getNumBitVectors()); - } else { - return HyperLogLog.builder().build(); - } - } - - public static NumDistinctValueEstimator getEmptyNumDistinctValueEstimator(String func, - int numBitVectors) { - if ("fm".equals(func.toLowerCase())) { - return new FMSketch(numBitVectors); - } else if ("hll".equals(func.toLowerCase())) { - return HyperLogLog.builder().build(); - } else { - throw new RuntimeException("Can not recognize " + func); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java b/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java deleted file mode 100644 index 36a49c2..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java +++ /dev/null @@ -1,359 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common.ndv.fm; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Random; - -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javolution.util.FastBitSet; - -public class FMSketch implements NumDistinctValueEstimator { - - static final Logger LOG = LoggerFactory.getLogger(FMSketch.class.getName()); - - /* We want a,b,x to come from a finite field of size 0 to k, where k is a prime number. - * 2^p - 1 is prime for p = 31. Hence bitvectorSize has to be 31. Pick k to be 2^p -1. - * If a,b,x didn't come from a finite field ax1 + b mod k and ax2 + b mod k will not be pair wise - * independent. As a consequence, the hash values will not distribute uniformly from 0 to 2^p-1 - * thus introducing errors in the estimates. - */ - public static final int BIT_VECTOR_SIZE = 31; - - // Refer to Flajolet-Martin'86 for the value of phi - private static final double PHI = 0.77351; - - private final int[] a; - private final int[] b; - private final FastBitSet[] bitVector; - - private final Random aValue; - private final Random bValue; - - private int numBitVectors; - - /* Create a new distinctValueEstimator - */ - public FMSketch(int numBitVectors) { - this.numBitVectors = numBitVectors; - bitVector = new FastBitSet[numBitVectors]; - for (int i=0; i< numBitVectors; i++) { - bitVector[i] = new FastBitSet(BIT_VECTOR_SIZE); - } - - a = new int[numBitVectors]; - b = new int[numBitVectors]; - - /* Use a large prime number as a seed to the random number generator. - * Java's random number generator uses the Linear Congruential Generator to generate random - * numbers using the following recurrence relation, - * - * X(n+1) = (a X(n) + c ) mod m - * - * where X0 is the seed. Java implementation uses m = 2^48. This is problematic because 2^48 - * is not a prime number and hence the set of numbers from 0 to m don't form a finite field. - * If these numbers don't come from a finite field any give X(n) and X(n+1) may not be pair - * wise independent. - * - * However, empirically passing in prime numbers as seeds seems to work better than when passing - * composite numbers as seeds. Ideally Java's Random should pick m such that m is prime. - * - */ - aValue = new Random(99397); - bValue = new Random(9876413); - - for (int i = 0; i < numBitVectors; i++) { - int randVal; - /* a and b shouldn't be even; If a and b are even, then none of the values - * will set bit 0 thus introducing errors in the estimate. Both a and b can be even - * 25% of the times and as a result 25% of the bit vectors could be inaccurate. To avoid this - * always pick odd values for a and b. - */ - do { - randVal = aValue.nextInt(); - } while (randVal % 2 == 0); - - a[i] = randVal; - - do { - randVal = bValue.nextInt(); - } while (randVal % 2 == 0); - - b[i] = randVal; - - if (a[i] < 0) { - a[i] = a[i] + (1 << BIT_VECTOR_SIZE - 1); - } - - if (b[i] < 0) { - b[i] = b[i] + (1 << BIT_VECTOR_SIZE - 1); - } - } - } - - /** - * Resets a distinctValueEstimator object to its original state. - */ - public void reset() { - for (int i=0; i< numBitVectors; i++) { - bitVector[i].clear(); - } - } - - public FastBitSet getBitVector(int index) { - return bitVector[index]; - } - - public FastBitSet setBitVector(FastBitSet fastBitSet, int index) { - return bitVector[index] = fastBitSet; - } - - public int getNumBitVectors() { - return numBitVectors; - } - - public int getBitVectorSize() { - return BIT_VECTOR_SIZE; - } - - public void printNumDistinctValueEstimator() { - String t = new String(); - - LOG.debug("NumDistinctValueEstimator"); - LOG.debug("Number of Vectors: {}", numBitVectors); - LOG.debug("Vector Size: {}", BIT_VECTOR_SIZE); - - for (int i=0; i < numBitVectors; i++) { - t = t + bitVector[i].toString(); - } - - LOG.debug("Serialized Vectors: "); - LOG.debug(t); - } - - @Override - public byte[] serialize() { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - // write bytes to bos ... - try { - FMSketchUtils.serializeFM(bos, this); - final byte[] result = bos.toByteArray(); - bos.close(); - return result; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public NumDistinctValueEstimator deserialize(byte[] buf) { - InputStream is = new ByteArrayInputStream(buf); - try { - NumDistinctValueEstimator n = FMSketchUtils.deserializeFM(is); - is.close(); - return n; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private int generateHash(long v, int hashNum) { - int mod = (1<<BIT_VECTOR_SIZE) - 1; - long tempHash = a[hashNum] * v + b[hashNum]; - tempHash %= mod; - int hash = (int) tempHash; - - /* Hash function should map the long value to 0...2^L-1. - * Hence hash value has to be non-negative. - */ - if (hash < 0) { - hash = hash + mod; - } - return hash; - } - - private int generateHashForPCSA(long v) { - int mod = 1 << (BIT_VECTOR_SIZE - 1) - 1; - long tempHash = a[0] * v + b[0]; - tempHash %= mod; - int hash = (int) tempHash; - - /* Hash function should map the long value to 0...2^L-1. - * Hence hash value has to be non-negative. - */ - if (hash < 0) { - hash = hash + mod + 1; - } - return hash; - } - - public void addToEstimator(long v) { - /* Update summary bitVector : - * Generate hash value of the long value and mod it by 2^bitVectorSize-1. - * In this implementation bitVectorSize is 31. - */ - - for (int i = 0; i<numBitVectors; i++) { - int hash = generateHash(v,i); - int index; - - // Find the index of the least significant bit that is 1 - for (index=0; index<BIT_VECTOR_SIZE; index++) { - if (hash % 2 != 0) { - break; - } - hash = hash >> 1; - } - - // Set bitvector[index] := 1 - bitVector[i].set(index); - } - } - - public void addToEstimatorPCSA(long v) { - int hash = generateHashForPCSA(v); - int rho = hash/numBitVectors; - int index; - - // Find the index of the least significant bit that is 1 - for (index=0; index<BIT_VECTOR_SIZE; index++) { - if (rho % 2 != 0) { - break; - } - rho = rho >> 1; - } - - // Set bitvector[index] := 1 - bitVector[hash%numBitVectors].set(index); - } - - public void addToEstimator(double d) { - int v = new Double(d).hashCode(); - addToEstimator(v); - } - - public void addToEstimatorPCSA(double d) { - int v = new Double(d).hashCode(); - addToEstimatorPCSA(v); - } - - public void addToEstimator(HiveDecimal decimal) { - int v = decimal.hashCode(); - addToEstimator(v); - } - - public void addToEstimatorPCSA(HiveDecimal decimal) { - int v = decimal.hashCode(); - addToEstimatorPCSA(v); - } - - public void mergeEstimators(FMSketch o) { - // Bitwise OR the bitvector with the bitvector in the agg buffer - for (int i=0; i<numBitVectors; i++) { - bitVector[i].or(o.getBitVector(i)); - } - } - - public long estimateNumDistinctValuesPCSA() { - double numDistinctValues = 0.0; - long S = 0; - - for (int i=0; i < numBitVectors; i++) { - int index = 0; - while (bitVector[i].get(index) && index < BIT_VECTOR_SIZE) { - index = index + 1; - } - S = S + index; - } - - numDistinctValues = ((numBitVectors/PHI) * Math.pow(2.0, S/numBitVectors)); - return ((long)numDistinctValues); - } - - /* We use the Flajolet-Martin estimator to estimate the number of distinct values.FM uses the - * location of the least significant zero as an estimate of log2(phi*ndvs). - */ - public long estimateNumDistinctValues() { - int sumLeastSigZero = 0; - double avgLeastSigZero; - double numDistinctValues; - - for (int i=0; i< numBitVectors; i++) { - int leastSigZero = bitVector[i].nextClearBit(0); - sumLeastSigZero += leastSigZero; - } - - avgLeastSigZero = - sumLeastSigZero/(numBitVectors * 1.0) - (Math.log(PHI)/Math.log(2.0)); - numDistinctValues = Math.pow(2.0, avgLeastSigZero); - return ((long)(numDistinctValues)); - } - - @InterfaceAudience.LimitedPrivate(value = { "Hive" }) - static int lengthFor(JavaDataModel model, Integer numVector) { - int length = model.object(); - length += model.primitive1() * 2; // two int - length += model.primitive2(); // one double - length += model.lengthForRandom() * 2; // two Random - - if (numVector == null) { - numVector = 16; // HiveConf hive.stats.ndv.error default produces 16 vectors - } - - if (numVector > 0) { - length += model.array() * 3; // three array - length += model.primitive1() * numVector * 2; // two int array - length += (model.object() + model.array() + model.primitive1() + - model.primitive2()) * numVector; // bitset array - } - return length; - } - - public int lengthFor(JavaDataModel model) { - return lengthFor(model, getNumBitVectors()); - } - - // the caller needs to gurrantee that they are the same type based on numBitVectors - @Override - public void mergeEstimators(NumDistinctValueEstimator o) { - // Bitwise OR the bitvector with the bitvector in the agg buffer - for (int i = 0; i < numBitVectors; i++) { - bitVector[i].or(((FMSketch) o).getBitVector(i)); - } - } - - @Override - public void addToEstimator(String s) { - int v = s.hashCode(); - addToEstimator(v); - } - - @Override - public boolean canMerge(NumDistinctValueEstimator o) { - return o instanceof FMSketch && this.numBitVectors == ((FMSketch) o).numBitVectors; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketchUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketchUtils.java b/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketchUtils.java deleted file mode 100644 index 0150678..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ndv/fm/FMSketchUtils.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common.ndv.fm; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javolution.util.FastBitSet; - -public class FMSketchUtils { - - static final Logger LOG = LoggerFactory.getLogger(FMSketch.class.getName()); - public static final byte[] MAGIC = new byte[] { 'F', 'M' }; - - /* - * Serializes a distinctValueEstimator object to Text for transport. - * - * <b>4 byte header</b> is encoded like below 2 bytes - FM magic string to - * identify serialized stream 2 bytes - numbitvectors because - * BIT_VECTOR_SIZE=31, 4 bytes are enough to hold positions of 0-31 - */ - public static void serializeFM(OutputStream out, FMSketch fm) throws IOException { - out.write(MAGIC); - - // max of numBitVectors = 1024, 2 bytes is enough. - byte[] nbv = new byte[2]; - nbv[0] = (byte) fm.getNumBitVectors(); - nbv[1] = (byte) (fm.getNumBitVectors() >>> 8); - - out.write(nbv); - - // original toString takes too much space - // we compress a fastbitset to 4 bytes - for (int i = 0; i < fm.getNumBitVectors(); i++) { - writeBitVector(out, fm.getBitVector(i)); - } - } - - // BIT_VECTOR_SIZE is 31, we can use 32 bits, i.e., 4 bytes to represent a - // FastBitSet, rather than using 31 integers. - private static void writeBitVector(OutputStream out, FastBitSet bit) throws IOException { - int num = 0; - for (int pos = 0; pos < FMSketch.BIT_VECTOR_SIZE; pos++) { - if (bit.get(pos)) { - num |= 1 << pos; - } - } - byte[] i = new byte[4]; - for (int j = 0; j < 4; j++) { - i[j] = (byte) ((num >>> (8 * j)) & 0xff); - } - out.write(i); - } - - /* - * Deserializes from string to FastBitSet; Creates a NumDistinctValueEstimator - * object and returns it. - */ - public static FMSketch deserializeFM(byte[] buf) throws IOException { - InputStream is = new ByteArrayInputStream(buf); - try { - FMSketch sketch = deserializeFM(is); - is.close(); - return sketch; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static FMSketch deserializeFM(InputStream in) throws IOException { - checkMagicString(in); - - byte[] nbv = new byte[2]; - nbv[0] = (byte) in.read(); - nbv[1] = (byte) in.read(); - - int numBitVectors = 0; - numBitVectors |= (nbv[0] & 0xff); - numBitVectors |= ((nbv[1] & 0xff) << 8); - - FMSketch sketch = new FMSketch(numBitVectors); - for (int n = 0; n < numBitVectors; n++) { - sketch.setBitVector(readBitVector(in), n); - } - return sketch; - } - - private static FastBitSet readBitVector(InputStream in) throws IOException { - FastBitSet fastBitSet = new FastBitSet(); - fastBitSet.clear(); - for (int i = 0; i < 4; i++) { - byte b = (byte) in.read(); - for (int j = 0; j < 8; j++) { - if ((b & (1 << j)) != 0) { - fastBitSet.set(j + 8 * i); - } - } - } - return fastBitSet; - } - - private static void checkMagicString(InputStream in) throws IOException { - byte[] magic = new byte[2]; - magic[0] = (byte) in.read(); - magic[1] = (byte) in.read(); - - if (!Arrays.equals(magic, MAGIC)) { - throw new IllegalArgumentException("The input stream is not a FMSketch stream."); - } - } -}
