http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java new file mode 100644 index 0000000..a76b848 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public class SharedCache { + private Map<String, Database> databaseCache = new TreeMap<>(); + private Map<String, TableWrapper> tableCache = new TreeMap<>(); + private Map<String, PartitionWrapper> partitionCache = new TreeMap<>(); + private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>(); + private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>(); + private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>(); + private static MessageDigest md; + + private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class); + + static { + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("should not happen", e); + } + } + + public synchronized Database getDatabaseFromCache(String name) { + return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; + } + + public synchronized void addDatabaseToCache(String dbName, Database db) { + Database dbCopy = db.deepCopy(); + dbCopy.setName(StringUtils.normalizeIdentifier(dbName)); + databaseCache.put(dbName, dbCopy); + } + + public synchronized void removeDatabaseFromCache(String dbName) { + databaseCache.remove(dbName); + } + + public synchronized List<String> listCachedDatabases() { + return new ArrayList<>(databaseCache.keySet()); + } + + public synchronized void alterDatabaseInCache(String dbName, Database newDb) { + removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName)); + addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); + } + + public synchronized int getCachedDatabaseCount() { + return databaseCache.size(); + } + + public synchronized Table getTableFromCache(String dbName, String tableName) { + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper == null) { + return null; + } + Table t = CacheUtils.assemble(tblWrapper, this); + return t; + } + + public synchronized void addTableToCache(String dbName, String tblName, Table tbl) { + Table tblCopy = tbl.deepCopy(); + tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName)); + tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName)); + if (tblCopy.getPartitionKeys() != null) { + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(StringUtils.normalizeIdentifier(fs.getName())); + } + } + TableWrapper wrapper; + if (tbl.getSd() != null) { + byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md); + StorageDescriptor sd = tbl.getSd(); + increSd(sd, sdHash); + tblCopy.setSd(null); + wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new TableWrapper(tblCopy, null, null, null); + } + tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); + } + + public synchronized void removeTableFromCache(String dbName, String tblName) { + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + if (sdHash!=null) { + decrSd(sdHash); + } + } + + public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { + return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null; + } + + public synchronized void removeTableColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + tableColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + public synchronized void removeTableColStatsFromCache(String dbName, String tblName, + String colName) { + tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + } + + public synchronized void updateTableColStatsInCache(String dbName, String tableName, + List<ColumnStatisticsObj> colStatsForTable) { + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + // Get old stats object if present + String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); + if (oldStatsObj != null) { + LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName() + + ", of table: " + tableName + " and database: " + dbName); + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + tableColStatsCache.put(key, colStatObj); + } + } + } + + public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { + removeTableFromCache(dbName, tblName); + addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()), + StringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + } + + public synchronized void alterTableInPartitionCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); + for (Partition part : partitions) { + removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues()); + part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName())); + part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName())); + addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()), + StringUtils.normalizeIdentifier(newTable.getTableName()), part); + } + } + } + + public synchronized void alterTableInTableColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + tableColStatsCache.entrySet().iterator(); + Map<String, ColumnStatisticsObj> newTableColStats = + new HashMap<>(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { + String[] decomposedKey = CacheUtils.splitTableColStats(key); + String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); + newTableColStats.put(newKey, colStatObj); + iterator.remove(); + } + } + tableColStatsCache.putAll(newTableColStats); + } + } + + public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); + Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>(); + for (Partition part : partitions) { + String oldPartialPartitionKey = + CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues()); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + String newKey = + CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1], + (List<String>) decomposedKey[2], (String) decomposedKey[3]); + newPartitionColStats.put(newKey, colStatObj); + iterator.remove(); + } + } + } + partitionColStatsCache.putAll(newPartitionColStats); + } + } + + public synchronized int getCachedTableCount() { + return tableCache.size(); + } + + public synchronized List<Table> listCachedTables(String dbName) { + List<Table> tables = new ArrayList<>(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName)) { + tables.add(CacheUtils.assemble(wrapper, this)); + } + } + return tables; + } + + public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) { + List<TableMeta> tableMetas = new ArrayList<>(); + for (String dbName : listCachedDatabases()) { + if (CacheUtils.matches(dbName, dbNames)) { + for (Table table : listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), tableNames)) { + if (tableTypes==null || tableTypes.contains(table.getTableType())) { + TableMeta metaData = new TableMeta( + dbName, table.getTableName(), table.getTableType()); + metaData.setComments(table.getParameters().get("comment")); + tableMetas.add(metaData); + } + } + } + } + } + return tableMetas; + } + + public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { + Partition partCopy = part.deepCopy(); + PartitionWrapper wrapper; + if (part.getSd()!=null) { + byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md); + StorageDescriptor sd = part.getSd(); + increSd(sd, sdHash); + partCopy.setSd(null); + wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new PartitionWrapper(partCopy, null, null, null); + } + partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); + } + + public synchronized Partition getPartitionFromCache(String key) { + PartitionWrapper wrapper = partitionCache.get(key); + if (wrapper == null) { + return null; + } + Partition p = CacheUtils.assemble(wrapper, this); + return p; + } + + public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) { + return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) { + return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public synchronized Partition removePartitionFromCache(String dbName, String tblName, + List<String> part_vals) { + PartitionWrapper wrapper = + partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + if (wrapper.getSdHash() != null) { + decrSd(wrapper.getSdHash()); + } + return wrapper.getPartition(); + } + + // Remove cached column stats for all partitions of a table + public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + // Remove cached column stats for a particular partition of a table + public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List<String> partVals) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + // Remove cached column stats for a particular partition and a particular column of a table + public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List<String> partVals, String colName) { + partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); + } + + public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) { + List<Partition> partitions = new ArrayList<>(); + int count = 0; + for (PartitionWrapper wrapper : partitionCache.values()) { + if (wrapper.getPartition().getDbName().equals(dbName) + && wrapper.getPartition().getTableName().equals(tblName) + && (max == -1 || count < max)) { + partitions.add(CacheUtils.assemble(wrapper, this)); + count++; + } + } + return partitions; + } + + public synchronized void alterPartitionInCache(String dbName, String tblName, + List<String> partVals, Partition newPart) { + removePartitionFromCache(dbName, tblName, partVals); + addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()), + StringUtils.normalizeIdentifier(newPart.getTableName()), newPart); + } + + public synchronized void alterPartitionInColStatsCache(String dbName, String tblName, + List<String> partVals, Partition newPart) { + String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); + Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>(); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + String newKey = + CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()), + StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(), + (String) decomposedKey[3]); + newPartitionColStats.put(newKey, colStatObj); + iterator.remove(); + } + } + partitionColStatsCache.putAll(newPartitionColStats); + } + + public synchronized void updatePartitionColStatsInCache(String dbName, String tableName, + List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) { + for (ColumnStatisticsObj colStatObj : colStatsObjs) { + // Get old stats object if present + String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + LOG.debug("CachedStore: updating partition column stats for column: " + + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName); + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + partitionColStatsCache.put(key, colStatObj); + } + } + } + + public synchronized int getCachedPartitionCount() { + return partitionCache.size(); + } + + public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { + return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; + } + + public synchronized void addPartitionColStatsToCache(String dbName, String tableName, + Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) { + for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) { + String partName = entry.getKey(); + try { + List<String> partVals = Warehouse.getPartValuesFromPartName(partName); + for (ColumnStatisticsObj colStatObj : entry.getValue()) { + String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); + partitionColStatsCache.put(key, colStatObj); + } + } catch (MetaException e) { + LOG.info("Unable to add partition: " + partName + " to SharedCache", e); + } + } + } + + public synchronized void refreshPartitionColStats(String dbName, String tableName, + Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) { + LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName + + " and table: " + tableName); + removePartitionColStatsFromCache(dbName, tableName); + addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); + } + + public synchronized void addTableColStatsToCache(String dbName, String tableName, + List<ColumnStatisticsObj> colStatsForTable) { + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); + tableColStatsCache.put(key, colStatObj); + } + } + + public synchronized void refreshTableColStats(String dbName, String tableName, + List<ColumnStatisticsObj> colStatsForTable) { + LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName + + " and table: " + tableName); + // Remove all old cache entries for this table + removeTableColStatsFromCache(dbName, tableName); + // Add new entries to cache + addTableColStatsToCache(dbName, tableName, colStatsForTable); + } + + public void increSd(StorageDescriptor sd, byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + if (sdCache.containsKey(byteArray)) { + sdCache.get(byteArray).refCount++; + } else { + StorageDescriptor sdToCache = sd.deepCopy(); + sdToCache.setLocation(null); + sdToCache.setParameters(null); + sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1)); + } + } + + public void decrSd(byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray); + sdWrapper.refCount--; + if (sdWrapper.getRefCount() == 0) { + sdCache.remove(byteArray); + } + } + + public StorageDescriptor getSdFromCache(byte[] sdHash) { + StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash)); + return sdWrapper.getSd(); + } + + // Replace databases in databaseCache with the new list + public synchronized void refreshDatabases(List<Database> databases) { + LOG.debug("CachedStore: updating cached database objects"); + for (String dbName : listCachedDatabases()) { + removeDatabaseFromCache(dbName); + } + for (Database db : databases) { + addDatabaseToCache(db.getName(), db); + } + } + + // Replace tables in tableCache with the new list + public synchronized void refreshTables(String dbName, List<Table> tables) { + LOG.debug("CachedStore: updating cached table objects for database: " + dbName); + for (Table tbl : listCachedTables(dbName)) { + removeTableFromCache(dbName, tbl.getTableName()); + } + for (Table tbl : tables) { + addTableToCache(dbName, tbl.getTableName(), tbl); + } + } + + public synchronized void refreshPartitions(String dbName, String tblName, + List<Partition> partitions) { + LOG.debug("CachedStore: updating cached partition objects for database: " + dbName + + " and table: " + tblName); + Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator(); + while (iterator.hasNext()) { + PartitionWrapper partitionWrapper = iterator.next().getValue(); + if (partitionWrapper.getPartition().getDbName().equals(dbName) + && partitionWrapper.getPartition().getTableName().equals(tblName)) { + iterator.remove(); + } + } + for (Partition part : partitions) { + addPartitionToCache(dbName, tblName, part); + } + } + + @VisibleForTesting + Map<String, Database> getDatabaseCache() { + return databaseCache; + } + + @VisibleForTesting + Map<String, TableWrapper> getTableCache() { + return tableCache; + } + + @VisibleForTesting + Map<String, PartitionWrapper> getPartitionCache() { + return partitionCache; + } + + @VisibleForTesting + Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() { + return sdCache; + } + + @VisibleForTesting + Map<String, ColumnStatisticsObj> getPartitionColStatsCache() { + return partitionColStatsCache; + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java new file mode 100644 index 0000000..45d5d8c --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.columnstats.aggr; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; + +public class BinaryColumnStatsAggregator extends ColumnStatsAggregator { + + @Override + public ColumnStatisticsObj aggregate(String colName, List<String> partNames, + List<ColumnStatistics> css) throws MetaException { + ColumnStatisticsObj statsObj = null; + BinaryColumnStatsData aggregateData = null; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats(); + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); + aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + columnStatisticsData.setBinaryStats(aggregateData); + statsObj.setStatsData(columnStatisticsData); + return statsObj; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java new file mode 100644 index 0000000..8aac0fe --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.columnstats.aggr; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; + +public class BooleanColumnStatsAggregator extends ColumnStatsAggregator { + + @Override + public ColumnStatisticsObj aggregate(String colName, List<String> partNames, + List<ColumnStatistics> css) throws MetaException { + ColumnStatisticsObj statsObj = null; + BooleanColumnStatsData aggregateData = null; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats(); + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues()); + aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses()); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + columnStatisticsData.setBooleanStats(aggregateData); + statsObj.setStatsData(columnStatisticsData); + return statsObj; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java new file mode 100644 index 0000000..cd0392d --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.columnstats.aggr; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; + +public abstract class ColumnStatsAggregator { + public boolean useDensityFunctionForNDVEstimation; + public double ndvTuner; + public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames, + List<ColumnStatistics> css) throws MetaException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java new file mode 100644 index 0000000..7aaab4a --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.columnstats.aggr; + +import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector; + +public class ColumnStatsAggregatorFactory { + + private ColumnStatsAggregatorFactory() { + } + + public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) { + ColumnStatsAggregator agg; + switch (type) { + case BOOLEAN_STATS: + agg = new BooleanColumnStatsAggregator(); + break; + case LONG_STATS: + agg = new LongColumnStatsAggregator(); + break; + case DATE_STATS: + agg = new DateColumnStatsAggregator(); + break; + case DOUBLE_STATS: + agg = new DoubleColumnStatsAggregator(); + break; + case STRING_STATS: + agg = new StringColumnStatsAggregator(); + break; + case BINARY_STATS: + agg = new BinaryColumnStatsAggregator(); + break; + case DECIMAL_STATS: + agg = new DecimalColumnStatsAggregator(); + break; + default: + throw new RuntimeException("Woh, bad. Unknown stats type " + type.toString()); + } + agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation; + agg.ndvTuner = ndvTuner; + return agg; + } + + public static ColumnStatisticsObj newColumnStaticsObj(String colName, String colType, _Fields type) { + ColumnStatisticsObj cso = new ColumnStatisticsObj(); + ColumnStatisticsData csd = new ColumnStatisticsData(); + cso.setColName(colName); + cso.setColType(colType); + switch (type) { + case BOOLEAN_STATS: + csd.setBooleanStats(new BooleanColumnStatsData()); + break; + + case LONG_STATS: + csd.setLongStats(new LongColumnStatsDataInspector()); + break; + + case DATE_STATS: + csd.setDateStats(new DateColumnStatsDataInspector()); + break; + + case DOUBLE_STATS: + csd.setDoubleStats(new DoubleColumnStatsDataInspector()); + break; + + case STRING_STATS: + csd.setStringStats(new StringColumnStatsDataInspector()); + break; + + case BINARY_STATS: + csd.setBinaryStats(new BinaryColumnStatsData()); + break; + + case DECIMAL_STATS: + csd.setDecimalStats(new DecimalColumnStatsDataInspector()); + break; + + default: + throw new RuntimeException("Woh, bad. Unknown stats type!"); + } + + cso.setStatsData(csd); + return cso; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java new file mode 100644 index 0000000..7f29561 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.columnstats.aggr; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; +import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Date; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DateColumnStatsAggregator extends ColumnStatsAggregator implements + IExtrapolatePartStatus { + + private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsAggregator.class); + + @Override + public ColumnStatisticsObj aggregate(String colName, List<String> partNames, + List<ColumnStatistics> css) throws MetaException { + ColumnStatisticsObj statsObj = null; + + // check if all the ColumnStatisticsObjs contain stats and all the ndv are + // bitvectors + boolean doAllPartitionContainStats = partNames.size() == css.size(); + LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + NumDistinctValueEstimator ndvEstimator = null; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + DateColumnStatsDataInspector dateColumnStats = + (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); + if (dateColumnStats.getNdvEstimator() == null) { + ndvEstimator = null; + break; + } else { + // check if all of the bit vectors can merge + NumDistinctValueEstimator estimator = dateColumnStats.getNdvEstimator(); + if (ndvEstimator == null) { + ndvEstimator = estimator; + } else { + if (ndvEstimator.canMerge(estimator)) { + continue; + } else { + ndvEstimator = null; + break; + } + } + } + } + if (ndvEstimator != null) { + ndvEstimator = NumDistinctValueEstimatorFactory + .getEmptyNumDistinctValueEstimator(ndvEstimator); + } + LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + if (doAllPartitionContainStats || css.size() < 2) { + DateColumnStatsDataInspector aggregateData = null; + long lowerBound = 0; + long higherBound = 0; + double densityAvgSum = 0.0; + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DateColumnStatsDataInspector newData = + (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); + lowerBound = Math.max(lowerBound, newData.getNumDVs()); + higherBound += newData.getNumDVs(); + densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue())) + / newData.getNumDVs(); + if (ndvEstimator != null) { + ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + } + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData + .setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } + } + if (ndvEstimator != null) { + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to + // use uniform distribution assumption because we can merge bitvectors + // to get a good estimation. + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + } else { + long estimation; + if (useDensityFunctionForNDVEstimation) { + // We have estimation, lowerbound and higherbound. We use estimation + // if it is between lowerbound and higherbound. + double densityAvg = densityAvgSum / partNames.size(); + estimation = (long) (diff(aggregateData.getHighValue(), aggregateData.getLowValue()) / densityAvg); + if (estimation < lowerBound) { + estimation = lowerBound; + } else if (estimation > higherBound) { + estimation = higherBound; + } + } else { + estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner); + } + aggregateData.setNumDVs(estimation); + } + columnStatisticsData.setDateStats(aggregateData); + } else { + // we need extrapolation + LOG.debug("start extrapolation for " + colName); + + Map<String, Integer> indexMap = new HashMap<>(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + Map<String, Double> adjustedIndexMap = new HashMap<>(); + Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>(); + // while we scan the css, we also get the densityAvg, lowerbound and + // higerbound when useDensityFunctionForNDVEstimation is true. + double densityAvgSum = 0.0; + if (ndvEstimator == null) { + // if not every partition uses bitvector for ndv, we just fall back to + // the traditional extrapolation methods. + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DateColumnStatsData newData = cso.getStatsData().getDateStats(); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs(); + } + adjustedIndexMap.put(partName, (double) indexMap.get(partName)); + adjustedStatsMap.put(partName, cso.getStatsData()); + } + } else { + // we first merge all the adjacent bitvectors that we could merge and + // derive new partition names and index. + StringBuilder pseudoPartName = new StringBuilder(); + double pseudoIndexSum = 0; + int length = 0; + int curIndex = -1; + DateColumnStatsDataInspector aggregateData = null; + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DateColumnStatsDataInspector newData = + (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); + // newData.isSetBitVectors() should be true for sure because we + // already checked it before. + if (indexMap.get(partName) != curIndex) { + // There is bitvector, but it is not adjacent to the previous ones. + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDateStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue()) + / aggregateData.getNumDVs(); + } + // reset everything + pseudoPartName = new StringBuilder(); + pseudoIndexSum = 0; + length = 0; + ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + } + aggregateData = null; + } + curIndex = indexMap.get(partName); + pseudoPartName.append(partName); + pseudoIndexSum += curIndex; + length++; + curIndex++; + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData.setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + } + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDateStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue()) + / aggregateData.getNumDVs(); + } + } + } + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + } + LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, + columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), css.size()); + statsObj.setStatsData(columnStatisticsData); + return statsObj; + } + + private long diff(Date d1, Date d2) { + return d1.getDaysSinceEpoch() - d2.getDaysSinceEpoch(); + } + + private Date min(Date d1, Date d2) { + return d1.compareTo(d2) < 0 ? d1 : d2; + } + + private Date max(Date d1, Date d2) { + return d1.compareTo(d2) < 0 ? d2 : d1; + } + + @Override + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, + int numPartsWithStats, Map<String, Double> adjustedIndexMap, + Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) { + int rightBorderInd = numParts; + DateColumnStatsDataInspector extrapolateDateData = new DateColumnStatsDataInspector(); + Map<String, DateColumnStatsData> extractedAdjustedStatsMap = new HashMap<>(); + for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) { + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDateStats()); + } + List<Map.Entry<String, DateColumnStatsData>> list = new LinkedList<>( + extractedAdjustedStatsMap.entrySet()); + // get the lowValue + Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() { + @Override + public int compare(Map.Entry<String, DateColumnStatsData> o1, + Map.Entry<String, DateColumnStatsData> o2) { + return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue()); + } + }); + double minInd = adjustedIndexMap.get(list.get(0).getKey()); + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + long lowValue = 0; + long min = list.get(0).getValue().getLowValue().getDaysSinceEpoch(); + long max = list.get(list.size() - 1).getValue().getLowValue().getDaysSinceEpoch(); + if (minInd == maxInd) { + lowValue = min; + } else if (minInd < maxInd) { + // left border is the min + lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd)); + } else { + // right border is the min + lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); + } + + // get the highValue + Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() { + @Override + public int compare(Map.Entry<String, DateColumnStatsData> o1, + Map.Entry<String, DateColumnStatsData> o2) { + return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue()); + } + }); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + long highValue = 0; + min = list.get(0).getValue().getHighValue().getDaysSinceEpoch(); + max = list.get(list.size() - 1).getValue().getHighValue().getDaysSinceEpoch(); + if (minInd == maxInd) { + highValue = min; + } else if (minInd < maxInd) { + // right border is the max + highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + highValue = (long) (min + (max - min) * minInd / (minInd - maxInd)); + } + + // get the #nulls + long numNulls = 0; + for (Map.Entry<String, DateColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) { + numNulls += entry.getValue().getNumNulls(); + } + // we scale up sumNulls based on the number of partitions + numNulls = numNulls * numParts / numPartsWithStats; + + // get the ndv + long ndv = 0; + Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() { + @Override + public int compare(Map.Entry<String, DateColumnStatsData> o1, + Map.Entry<String, DateColumnStatsData> o2) { + return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); + } + }); + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); + long higherBound = 0; + for (Map.Entry<String, DateColumnStatsData> entry : list) { + higherBound += entry.getValue().getNumDVs(); + } + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { + ndv = (long) ((highValue - lowValue) / densityAvg); + if (ndv < lowerBound) { + ndv = lowerBound; + } else if (ndv > higherBound) { + ndv = higherBound; + } + } else { + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + min = list.get(0).getValue().getNumDVs(); + max = list.get(list.size() - 1).getValue().getNumDVs(); + if (minInd == maxInd) { + ndv = min; + } else if (minInd < maxInd) { + // right border is the max + ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + ndv = (long) (min + (max - min) * minInd / (minInd - maxInd)); + } + } + extrapolateDateData.setLowValue(new Date(lowValue)); + extrapolateDateData.setHighValue(new Date(highValue)); + extrapolateDateData.setNumNulls(numNulls); + extrapolateDateData.setNumDVs(ndv); + extrapolateData.setDateStats(extrapolateDateData); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java new file mode 100644 index 0000000..05c0280 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.columnstats.aggr; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; +import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements + IExtrapolatePartStatus { + + private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsAggregator.class); + + @Override + public ColumnStatisticsObj aggregate(String colName, List<String> partNames, + List<ColumnStatistics> css) throws MetaException { + ColumnStatisticsObj statsObj = null; + + // check if all the ColumnStatisticsObjs contain stats and all the ndv are + // bitvectors + boolean doAllPartitionContainStats = partNames.size() == css.size(); + LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + NumDistinctValueEstimator ndvEstimator = null; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + DecimalColumnStatsDataInspector decimalColumnStatsData = + (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); + if (decimalColumnStatsData.getNdvEstimator() == null) { + ndvEstimator = null; + break; + } else { + // check if all of the bit vectors can merge + NumDistinctValueEstimator estimator = decimalColumnStatsData.getNdvEstimator(); + if (ndvEstimator == null) { + ndvEstimator = estimator; + } else { + if (ndvEstimator.canMerge(estimator)) { + continue; + } else { + ndvEstimator = null; + break; + } + } + } + } + if (ndvEstimator != null) { + ndvEstimator = NumDistinctValueEstimatorFactory + .getEmptyNumDistinctValueEstimator(ndvEstimator); + } + LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + if (doAllPartitionContainStats || css.size() < 2) { + DecimalColumnStatsDataInspector aggregateData = null; + long lowerBound = 0; + long higherBound = 0; + double densityAvgSum = 0.0; + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DecimalColumnStatsDataInspector newData = + (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); + lowerBound = Math.max(lowerBound, newData.getNumDVs()); + higherBound += newData.getNumDVs(); + densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils + .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); + if (ndvEstimator != null) { + ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + } + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < MetaStoreUtils + .decimalToDouble(newData.getLowValue())) { + aggregateData.setLowValue(aggregateData.getLowValue()); + } else { + aggregateData.setLowValue(newData.getLowValue()); + } + if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > MetaStoreUtils + .decimalToDouble(newData.getHighValue())) { + aggregateData.setHighValue(aggregateData.getHighValue()); + } else { + aggregateData.setHighValue(newData.getHighValue()); + } + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } + } + if (ndvEstimator != null) { + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to + // use uniform distribution assumption because we can merge bitvectors + // to get a good estimation. + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + } else { + long estimation; + if (useDensityFunctionForNDVEstimation) { + // We have estimation, lowerbound and higherbound. We use estimation + // if it is between lowerbound and higherbound. + double densityAvg = densityAvgSum / partNames.size(); + estimation = (long) ((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / densityAvg); + if (estimation < lowerBound) { + estimation = lowerBound; + } else if (estimation > higherBound) { + estimation = higherBound; + } + } else { + estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner); + } + aggregateData.setNumDVs(estimation); + } + columnStatisticsData.setDecimalStats(aggregateData); + } else { + // we need extrapolation + LOG.debug("start extrapolation for " + colName); + Map<String, Integer> indexMap = new HashMap<>(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + Map<String, Double> adjustedIndexMap = new HashMap<>(); + Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<>(); + // while we scan the css, we also get the densityAvg, lowerbound and + // higerbound when useDensityFunctionForNDVEstimation is true. + double densityAvgSum = 0.0; + if (ndvEstimator == null) { + // if not every partition uses bitvector for ndv, we just fall back to + // the traditional extrapolation methods. + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils + .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); + } + adjustedIndexMap.put(partName, (double) indexMap.get(partName)); + adjustedStatsMap.put(partName, cso.getStatsData()); + } + } else { + // we first merge all the adjacent bitvectors that we could merge and + // derive new partition names and index. + StringBuilder pseudoPartName = new StringBuilder(); + double pseudoIndexSum = 0; + int length = 0; + int curIndex = -1; + DecimalColumnStatsDataInspector aggregateData = null; + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DecimalColumnStatsDataInspector newData = + (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); + // newData.isSetBitVectors() should be true for sure because we + // already checked it before. + if (indexMap.get(partName) != curIndex) { + // There is bitvector, but it is not adjacent to the previous ones. + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDecimalStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + } + // reset everything + pseudoPartName = new StringBuilder(); + pseudoIndexSum = 0; + length = 0; + ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + } + aggregateData = null; + } + curIndex = indexMap.get(partName); + pseudoPartName.append(partName); + pseudoIndexSum += curIndex; + length++; + curIndex++; + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < MetaStoreUtils + .decimalToDouble(newData.getLowValue())) { + aggregateData.setLowValue(aggregateData.getLowValue()); + } else { + aggregateData.setLowValue(newData.getLowValue()); + } + if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > MetaStoreUtils + .decimalToDouble(newData.getHighValue())) { + aggregateData.setHighValue(aggregateData.getHighValue()); + } else { + aggregateData.setHighValue(newData.getHighValue()); + } + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + } + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDecimalStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + } + } + } + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + } + LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, + columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), css.size()); + statsObj.setStatsData(columnStatisticsData); + return statsObj; + } + + @Override + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, + int numPartsWithStats, Map<String, Double> adjustedIndexMap, + Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) { + int rightBorderInd = numParts; + DecimalColumnStatsDataInspector extrapolateDecimalData = new DecimalColumnStatsDataInspector(); + Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new HashMap<>(); + for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) { + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats()); + } + List<Map.Entry<String, DecimalColumnStatsData>> list = new LinkedList<>( + extractedAdjustedStatsMap.entrySet()); + // get the lowValue + Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() { + @Override + public int compare(Map.Entry<String, DecimalColumnStatsData> o1, + Map.Entry<String, DecimalColumnStatsData> o2) { + return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue()); + } + }); + double minInd = adjustedIndexMap.get(list.get(0).getKey()); + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + double lowValue = 0; + double min = MetaStoreUtils.decimalToDouble(list.get(0).getValue().getLowValue()); + double max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 1).getValue().getLowValue()); + if (minInd == maxInd) { + lowValue = min; + } else if (minInd < maxInd) { + // left border is the min + lowValue = (max - (max - min) * maxInd / (maxInd - minInd)); + } else { + // right border is the min + lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); + } + + // get the highValue + Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() { + @Override + public int compare(Map.Entry<String, DecimalColumnStatsData> o1, + Map.Entry<String, DecimalColumnStatsData> o2) { + return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue()); + } + }); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + double highValue = 0; + min = MetaStoreUtils.decimalToDouble(list.get(0).getValue().getHighValue()); + max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 1).getValue().getHighValue()); + if (minInd == maxInd) { + highValue = min; + } else if (minInd < maxInd) { + // right border is the max + highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + highValue = (min + (max - min) * minInd / (minInd - maxInd)); + } + + // get the #nulls + long numNulls = 0; + for (Map.Entry<String, DecimalColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) { + numNulls += entry.getValue().getNumNulls(); + } + // we scale up sumNulls based on the number of partitions + numNulls = numNulls * numParts / numPartsWithStats; + + // get the ndv + long ndv = 0; + long ndvMin = 0; + long ndvMax = 0; + Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() { + @Override + public int compare(Map.Entry<String, DecimalColumnStatsData> o1, + Map.Entry<String, DecimalColumnStatsData> o2) { + return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); + } + }); + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); + long higherBound = 0; + for (Map.Entry<String, DecimalColumnStatsData> entry : list) { + higherBound += entry.getValue().getNumDVs(); + } + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { + ndv = (long) ((highValue - lowValue) / densityAvg); + if (ndv < lowerBound) { + ndv = lowerBound; + } else if (ndv > higherBound) { + ndv = higherBound; + } + } else { + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + ndvMin = list.get(0).getValue().getNumDVs(); + ndvMax = list.get(list.size() - 1).getValue().getNumDVs(); + if (minInd == maxInd) { + ndv = ndvMin; + } else if (minInd < maxInd) { + // right border is the max + ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd)); + } + } + extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String + .valueOf(lowValue))); + extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String + .valueOf(highValue))); + extrapolateDecimalData.setNumNulls(numNulls); + extrapolateDecimalData.setNumDVs(ndv); + extrapolateData.setDecimalStats(extrapolateDecimalData); + } +}
