HIVE-17629 : CachedStore - wait for prewarm at use time, not init time (Sergey Shelukhin, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/842d4dfc Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/842d4dfc Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/842d4dfc Branch: refs/heads/master Commit: 842d4dfc891dc568159a6a92dbbdd4f18e0c9dd0 Parents: 24d125b Author: sergey <[email protected]> Authored: Mon Oct 9 15:24:50 2017 -0700 Committer: sergey <[email protected]> Committed: Mon Oct 9 15:24:50 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/HiveMetaStore.java | 4 + .../hadoop/hive/metastore/cache/CacheUtils.java | 8 +- .../hive/metastore/cache/CachedStore.java | 545 ++++++++++++++----- .../hive/metastore/cache/SharedCache.java | 124 ++--- .../hive/metastore/cache/TestCachedStore.java | 113 ++-- .../apache/hive/service/server/HiveServer2.java | 8 + 6 files changed, 549 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/842d4dfc/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 5617e1c..f8b79a0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; +import org.apache.hadoop.hive.metastore.cache.CachedStore; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent; @@ -7629,6 +7630,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { } }); + // This will only initialize the cache if configured. + CachedStore.initSharedCacheAsync(conf); + //Start Metrics for Standalone (Remote) Mode if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/842d4dfc/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index 280655d..aaeb6d4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -88,10 +88,10 @@ public class CacheUtils { return result; } - public static Table assemble(TableWrapper wrapper) { + static Table assemble(TableWrapper wrapper, SharedCache sharedCache) { Table t = wrapper.getTable().deepCopy(); if (wrapper.getSdHash()!=null) { - StorageDescriptor sdCopy = SharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy(); + StorageDescriptor sdCopy = sharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy(); if (sdCopy.getBucketCols()==null) { sdCopy.setBucketCols(new ArrayList<String>()); } @@ -109,10 +109,10 @@ public class CacheUtils { return t; } - public static Partition assemble(PartitionWrapper wrapper) { + static Partition assemble(PartitionWrapper wrapper, SharedCache sharedCache) { Partition p = wrapper.getPartition().deepCopy(); if (wrapper.getSdHash()!=null) { - StorageDescriptor sdCopy = SharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy(); + StorageDescriptor sdCopy = sharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy(); if (sdCopy.getBucketCols()==null) { sdCopy.setBucketCols(new ArrayList<String>()); } http://git-wip-us.apache.org/repos/asf/hive/blob/842d4dfc/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 7939bfe..edc8e14 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Deadline; import org.apache.hadoop.hive.metastore.FileMetadataHandler; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -84,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.common.util.HiveStringUtils; import org.apache.thrift.TException; @@ -120,8 +122,11 @@ public class CachedStore implements RawStore, Configurable { Configuration conf; private PartitionExpressionProxy expressionProxy = null; // Default value set to 100 milliseconds for test purpose - private long cacheRefreshPeriod = 100; - static boolean firstTime = true; + private static long cacheRefreshPeriod = 100; + + /** A wrapper over SharedCache. Allows one to get SharedCache safely; should be merged + * into SharedCache itself (see the TODO on the class). */ + private static final SharedCacheWrapper sharedCacheWrapper = new SharedCacheWrapper(); static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); @@ -193,6 +198,22 @@ public class CachedStore implements RawStore, Configurable { public CachedStore() { } + public static void initSharedCacheAsync(HiveConf conf) { + String clazzName = null; + boolean isEnabled = false; + try { + clazzName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL); + isEnabled = MetaStoreUtils.getClass(clazzName).isAssignableFrom(CachedStore.class); + } catch (MetaException e) { + LOG.error("Cannot instantiate metastore class", e); + } + if (!isEnabled) { + LOG.debug("CachedStore is not enabled; using " + clazzName); + return; + } + sharedCacheWrapper.startInit(conf); + } + @Override public void setConf(Configuration conf) { String rawStoreClassName = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, @@ -214,38 +235,27 @@ public class CachedStore implements RawStore, Configurable { if (expressionProxy == null || conf != oldConf) { expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); } - if (firstTime) { - try { - LOG.info("Prewarming CachedStore"); - prewarm(); - LOG.info("CachedStore initialized"); - // Start the cache update master-worker threads - startCacheUpdateService(); - } catch (Exception e) { - throw new RuntimeException(e); - } - firstTime = false; - } } @VisibleForTesting - void prewarm() throws Exception { + static void prewarm(RawStore rawStore) throws Exception { // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy Deadline.registerIfNot(1000000); List<String> dbNames = rawStore.getAllDatabases(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); for (String dbName : dbNames) { Database db = rawStore.getDatabase(dbName); - SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(dbName), db); + sharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(dbName), db); List<String> tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { Table table = rawStore.getTable(dbName, tblName); - SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), table); Deadline.startTimer("getPartitions"); List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); for (Partition partition : partitions) { - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partition); } // Cache partition column stats @@ -254,7 +264,7 @@ public class CachedStore implements RawStore, Configurable { rawStore.getColStatsForTablePartitions(dbName, tblName); Deadline.stopTimer(); if (colStatsPerPartition != null) { - SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); + sharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); } // Cache table column stats List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); @@ -263,7 +273,7 @@ public class CachedStore implements RawStore, Configurable { rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); } } @@ -271,7 +281,7 @@ public class CachedStore implements RawStore, Configurable { } @VisibleForTesting - synchronized void startCacheUpdateService() { + synchronized static void startCacheUpdateService(Configuration conf) { if (cacheUpdateMaster == null) { cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override @@ -289,13 +299,13 @@ public class CachedStore implements RawStore, Configurable { TimeUnit.MILLISECONDS); } LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); - cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod, + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod, TimeUnit.MILLISECONDS); } } @VisibleForTesting - synchronized boolean stopCacheUpdateService(long timeout) { + synchronized static boolean stopCacheUpdateService(long timeout) { boolean tasksStoppedBeforeShutdown = false; if (cacheUpdateMaster != null) { LOG.info("CachedStore: shutting down cache update service"); @@ -313,31 +323,55 @@ public class CachedStore implements RawStore, Configurable { } @VisibleForTesting - void setCacheRefreshPeriod(long time) { - this.cacheRefreshPeriod = time; + static void setCacheRefreshPeriod(long time) { + cacheRefreshPeriod = time; } static class CacheUpdateMasterWork implements Runnable { + private boolean isFirstRun = true; + private final RawStore rawStore; - private final CachedStore cachedStore; - - public CacheUpdateMasterWork(CachedStore cachedStore) { - this.cachedStore = cachedStore; + public CacheUpdateMasterWork(Configuration conf) { + String rawStoreClassName = HiveConf.getVar(conf, + ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); + try { + rawStore = ((Class<? extends RawStore>) MetaStoreUtils.getClass( + rawStoreClassName)).newInstance(); + rawStore.setConf(conf); + } catch (InstantiationException | IllegalAccessException | MetaException e) { + // MetaException here really means ClassNotFound (see the utility method). + // So, if any of these happen, that means we can never succeed. + sharedCacheWrapper.updateInitState(e, true); + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); + } } @Override public void run() { - // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + if (isFirstRun) { + while (isFirstRun) { + try { + LOG.info("Prewarming CachedStore"); + prewarm(rawStore); + LOG.info("CachedStore initialized"); + } catch (Exception e) { + LOG.error("Prewarm failure", e); + sharedCacheWrapper.updateInitState(e, false); + return; + } + sharedCacheWrapper.updateInitState(null, false); + isFirstRun = false; + } + } else { + // TODO: prewarm and update can probably be merged. + update(); + } + } + + public void update() { Deadline.registerIfNot(1000000); LOG.debug("CachedStore: updating cached objects"); - String rawStoreClassName = - HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, - ObjectStore.class.getName()); - RawStore rawStore = null; try { - rawStore = - ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)).newInstance(); - rawStore.setConf(cachedStore.conf); List<String> dbNames = rawStore.getAllDatabases(); if (dbNames != null) { // Update the database in cache @@ -345,7 +379,7 @@ public class CachedStore implements RawStore, Configurable { for (String dbName : dbNames) { // Update the tables in cache updateTables(rawStore, dbName); - List<String> tblNames = cachedStore.getAllTables(dbName); + List<String> tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); for (String tblName : tblNames) { // Update the partitions for a table in cache updateTablePartitions(rawStore, dbName, tblName); @@ -356,18 +390,8 @@ public class CachedStore implements RawStore, Configurable { } } } - } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); } catch (Exception e) { - LOG.error("Updating CachedStore: error happen when refresh", e); - } finally { - try { - if (rawStore != null) { - rawStore.shutdown(); - } - } catch (Exception e) { - LOG.error("Error shutting down RawStore", e); - } + LOG.error("Updating CachedStore: error happen when refresh; ignoring", e); } } @@ -391,7 +415,7 @@ public class CachedStore implements RawStore, Configurable { LOG.debug("Skipping database cache update; the database list we have is dirty."); return; } - SharedCache.refreshDatabases(databases); + sharedCacheWrapper.getUnsafe().refreshDatabases(databases); } } finally { if (databaseCacheLock.isWriteLockedByCurrentThread()) { @@ -417,7 +441,7 @@ public class CachedStore implements RawStore, Configurable { LOG.debug("Skipping table cache update; the table list we have is dirty."); return; } - SharedCache.refreshTables(dbName, tables); + sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables); } } catch (MetaException e) { LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e); @@ -440,7 +464,8 @@ public class CachedStore implements RawStore, Configurable { LOG.debug("Skipping partition cache update; the partition list we have is dirty."); return; } - SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName), + sharedCacheWrapper.getUnsafe().refreshPartitions( + HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partitions); } } catch (MetaException | NoSuchObjectException e) { @@ -469,7 +494,8 @@ public class CachedStore implements RawStore, Configurable { + "have is dirty."); return; } - SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName), + sharedCacheWrapper.getUnsafe().refreshTableColStats( + HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); } } @@ -497,7 +523,8 @@ public class CachedStore implements RawStore, Configurable { + "list we have is dirty."); return; } - SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName), + sharedCacheWrapper.getUnsafe().refreshPartitionColStats( + HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition); } } @@ -545,11 +572,13 @@ public class CachedStore implements RawStore, Configurable { @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return; try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); isDatabaseCacheDirty.set(true); - SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), + sharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy()); } finally { databaseCacheLock.readLock().unlock(); @@ -558,7 +587,16 @@ public class CachedStore implements RawStore, Configurable { @Override public Database getDatabase(String dbName) throws NoSuchObjectException { - Database db = SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + SharedCache sharedCache; + try { + sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getDatabase(dbName); + } + } catch (MetaException e) { + throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx? + } + Database db = sharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); if (db == null) { throw new NoSuchObjectException(); } @@ -569,11 +607,13 @@ public class CachedStore implements RawStore, Configurable { public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { boolean succ = rawStore.dropDatabase(dbname); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); isDatabaseCacheDirty.set(true); - SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); + sharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); } finally { databaseCacheLock.readLock().unlock(); } @@ -586,11 +626,13 @@ public class CachedStore implements RawStore, Configurable { MetaException { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); isDatabaseCacheDirty.set(true); - SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); + sharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); } finally { databaseCacheLock.readLock().unlock(); } @@ -600,8 +642,12 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getDatabases(String pattern) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getDatabases(pattern); + } List<String> results = new ArrayList<String>(); - for (String dbName : SharedCache.listCachedDatabases()) { + for (String dbName : sharedCache.listCachedDatabases()) { dbName = HiveStringUtils.normalizeIdentifier(dbName); if (CacheUtils.matches(dbName, pattern)) { results.add(dbName); @@ -612,7 +658,11 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getAllDatabases() throws MetaException { - return SharedCache.listCachedDatabases(); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getAllDatabases(); + } + return sharedCache.listCachedDatabases(); } @Override @@ -652,11 +702,13 @@ public class CachedStore implements RawStore, Configurable { public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); validateTableType(tbl); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return; try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), + sharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); } finally { tableCacheLock.readLock().unlock(); @@ -668,12 +720,14 @@ public class CachedStore implements RawStore, Configurable { NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(dbName, tableName); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; // Remove table try { // Wait if background table cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName)); } finally { tableCacheLock.readLock().unlock(); @@ -683,7 +737,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background table col stats cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName)); } finally { tableColStatsCacheLock.readLock().unlock(); @@ -694,7 +748,11 @@ public class CachedStore implements RawStore, Configurable { @Override public Table getTable(String dbName, String tableName) throws MetaException { - Table tbl = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTable(dbName, tableName); + } + Table tbl = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName)); if (tbl != null) { tbl.unsetPrivileges(); @@ -707,11 +765,13 @@ public class CachedStore implements RawStore, Configurable { public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), + sharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), HiveStringUtils.normalizeIdentifier(part.getTableName()), part); } finally { partitionCacheLock.readLock().unlock(); @@ -725,12 +785,14 @@ public class CachedStore implements RawStore, Configurable { throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); for (Partition part : parts) { - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), part); } } finally { @@ -745,6 +807,8 @@ public class CachedStore implements RawStore, Configurable { boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -752,7 +816,7 @@ public class CachedStore implements RawStore, Configurable { PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); while (iterator.hasNext()) { Partition part = iterator.next(); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), part); } } finally { @@ -765,8 +829,12 @@ public class CachedStore implements RawStore, Configurable { @Override public Partition getPartition(String dbName, String tableName, List<String> part_vals) throws MetaException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartition(dbName, tableName, part_vals); + } Partition part = - SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), part_vals); if (part != null) { part.unsetPrivileges(); @@ -779,7 +847,11 @@ public class CachedStore implements RawStore, Configurable { @Override public boolean doesPartitionExist(String dbName, String tableName, List<String> part_vals) throws MetaException, NoSuchObjectException { - return SharedCache.existPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.doesPartitionExist(dbName, tableName, part_vals); + } + return sharedCache.existPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), part_vals); } @@ -788,12 +860,14 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; // Remove partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), part_vals); } finally { partitionCacheLock.readLock().unlock(); @@ -803,7 +877,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), part_vals); } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -815,7 +889,11 @@ public class CachedStore implements RawStore, Configurable { @Override public List<Partition> getPartitions(String dbName, String tableName, int max) throws MetaException, NoSuchObjectException { - List<Partition> parts = SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartitions(dbName, tableName, max); + } + List<Partition> parts = sharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), max); if (parts != null) { for (Partition part : parts) { @@ -830,12 +908,14 @@ public class CachedStore implements RawStore, Configurable { throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); validateTableType(newTable); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return; // Update table cache try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), newTable); } finally { tableCacheLock.readLock().unlock(); @@ -846,7 +926,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), newTable); } finally { partitionCacheLock.readLock().unlock(); @@ -856,8 +936,12 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String dbName, String pattern) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTables(dbName, pattern); + } List<String> tableNames = new ArrayList<String>(); - for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + for (Table table : sharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { if (CacheUtils.matches(table.getTableName(), pattern)) { tableNames.add(table.getTableName()); } @@ -868,8 +952,12 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String dbName, String pattern, TableType tableType) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTables(dbName, pattern); + } List<String> tableNames = new ArrayList<String>(); - for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + for (Table table : sharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { if (CacheUtils.matches(table.getTableName(), pattern) && table.getTableType().equals(tableType.toString())) { tableNames.add(table.getTableName()); @@ -881,16 +969,24 @@ public class CachedStore implements RawStore, Configurable { @Override public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) throws MetaException { - return SharedCache.getTableMeta(HiveStringUtils.normalizeIdentifier(dbNames), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTableMeta(dbNames, tableNames, tableTypes); + } + return sharedCache.getTableMeta(HiveStringUtils.normalizeIdentifier(dbNames), HiveStringUtils.normalizeIdentifier(tableNames), tableTypes); } @Override public List<Table> getTableObjectsByName(String dbName, List<String> tblNames) throws MetaException, UnknownDBException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTableObjectsByName(dbName, tblNames); + } List<Table> tables = new ArrayList<Table>(); for (String tblName : tblNames) { - tables.add(SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + tables.add(sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName))); } return tables; @@ -898,8 +994,16 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getAllTables(String dbName) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getAllTables(dbName); + } + return getAllTablesInternal(dbName, sharedCache); + } + + private static List<String> getAllTablesInternal(String dbName, SharedCache sharedCache) { List<String> tblNames = new ArrayList<String>(); - for (Table tbl : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + for (Table tbl : sharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { tblNames.add(HiveStringUtils.normalizeIdentifier(tbl.getTableName())); } return tblNames; @@ -908,9 +1012,13 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> listTableNamesByFilter(String dbName, String filter, short max_tables) throws MetaException, UnknownDBException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.listTableNamesByFilter(dbName, filter, max_tables); + } List<String> tableNames = new ArrayList<String>(); int count = 0; - for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + for (Table table : sharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { if (CacheUtils.matches(table.getTableName(), filter) && (max_tables == -1 || count < max_tables)) { tableNames.add(table.getTableName()); @@ -923,11 +1031,15 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> listPartitionNames(String dbName, String tblName, short max_parts) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.listPartitionNames(dbName, tblName, max_parts); + } List<String> partitionNames = new ArrayList<String>(); - Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Table t = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); int count = 0; - for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + for (Partition part : sharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), max_parts)) { if (max_parts == -1 || count < max_parts) { partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); @@ -955,11 +1067,13 @@ public class CachedStore implements RawStore, Configurable { throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); // Update partition cache + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); } finally { partitionCacheLock.readLock().unlock(); @@ -969,7 +1083,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -981,6 +1095,8 @@ public class CachedStore implements RawStore, Configurable { List<Partition> newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); // Update partition cache + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -988,7 +1104,7 @@ public class CachedStore implements RawStore, Configurable { for (int i = 0; i < partValsList.size(); i++) { List<String> partVals = partValsList.get(i); Partition newPart = newParts.get(i); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); } } finally { @@ -1002,7 +1118,7 @@ public class CachedStore implements RawStore, Configurable { for (int i = 0; i < partValsList.size(); i++) { List<String> partVals = partValsList.get(i); Partition newPart = newParts.get(i); - SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); } } finally { @@ -1047,8 +1163,9 @@ public class CachedStore implements RawStore, Configurable { } private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, - String defaultPartName, short maxParts, List<String> result) throws MetaException, NoSuchObjectException { - List<Partition> parts = SharedCache.listCachedPartitions( + String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache) + throws MetaException, NoSuchObjectException { + List<Partition> parts = sharedCache.listCachedPartitions( HiveStringUtils.normalizeIdentifier(table.getDbName()), HiveStringUtils.normalizeIdentifier(table.getTableName()), maxParts); for (Partition part : parts) { @@ -1057,7 +1174,8 @@ public class CachedStore implements RawStore, Configurable { if (defaultPartName == null || defaultPartName.isEmpty()) { defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME); } - return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, result); + return expressionProxy.filterPartitionsByExpr( + table.getPartitionKeys(), expr, defaultPartName, result); } @Override @@ -1072,12 +1190,18 @@ public class CachedStore implements RawStore, Configurable { public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, String defaultPartitionName, short maxParts, List<Partition> result) throws TException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartitionsByExpr( + dbName, tblName, expr, defaultPartitionName, maxParts, result); + } List<String> partNames = new LinkedList<String>(); - Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); + Table table = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartitionName, maxParts, partNames); + table, expr, defaultPartitionName, maxParts, partNames, sharedCache); for (String partName : partNames) { - Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Partition part = sharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); part.unsetPrivileges(); result.add(part); @@ -1088,24 +1212,33 @@ public class CachedStore implements RawStore, Configurable { @Override public int getNumPartitionsByFilter(String dbName, String tblName, String filter) throws MetaException, NoSuchObjectException { - Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName)); // TODO filter -> expr - return 0; + // SharedCache sharedCache = sharedCacheWrapper.get(); + // if (sharedCache == null) { + return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); + // } + // Table table = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + // HiveStringUtils.normalizeIdentifier(tblName)); + // return 0; } @Override public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) throws MetaException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); + } String defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME); List<String> partNames = new LinkedList<String>(); - Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Table table = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); - getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames); + getPartitionNamesPrunedByExprNoTxn( + table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache); return partNames.size(); } - public static List<String> partNameToVals(String name) { + private static List<String> partNameToVals(String name) { if (name == null) return null; List<String> vals = new ArrayList<String>(); String[] kvp = name.split("/"); @@ -1118,9 +1251,13 @@ public class CachedStore implements RawStore, Configurable { @Override public List<Partition> getPartitionsByNames(String dbName, String tblName, List<String> partNames) throws MetaException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartitionsByNames(dbName, tblName, partNames); + } List<Partition> partitions = new ArrayList<Partition>(); for (String partName : partNames) { - Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Partition part = sharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); if (part!=null) { partitions.add(part); @@ -1289,10 +1426,14 @@ public class CachedStore implements RawStore, Configurable { public Partition getPartitionWithAuth(String dbName, String tblName, List<String> partVals, String userName, List<String> groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { - Partition p = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); + } + Partition p = sharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partVals); if (p!=null) { - Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Table t = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, @@ -1306,11 +1447,15 @@ public class CachedStore implements RawStore, Configurable { public List<Partition> getPartitionsWithAuth(String dbName, String tblName, short maxParts, String userName, List<String> groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { - Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); + } + Table t = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); List<Partition> partitions = new ArrayList<Partition>(); int count = 0; - for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + for (Partition part : sharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), maxParts)) { if (maxParts == -1 || count < maxParts) { String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); @@ -1328,11 +1473,15 @@ public class CachedStore implements RawStore, Configurable { public List<String> listPartitionNamesPs(String dbName, String tblName, List<String> partVals, short maxParts) throws MetaException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); + } List<String> partNames = new ArrayList<String>(); int count = 0; - Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Table t = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); - for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + for (Partition part : sharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), maxParts)) { boolean psMatch = true; for (int i=0;i<partVals.size();i++) { @@ -1359,11 +1508,16 @@ public class CachedStore implements RawStore, Configurable { String tblName, List<String> partVals, short maxParts, String userName, List<String> groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.listPartitionsPsWithAuth( + dbName, tblName, partVals, maxParts, userName, groupNames); + } List<Partition> partitions = new ArrayList<Partition>(); - Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + Table t = sharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); int count = 0; - for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + for (Partition part : sharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), maxParts)) { boolean psMatch = true; for (int i=0;i<partVals.size();i++) { @@ -1393,6 +1547,8 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.updateTableColumnStatistics(colStats); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; String dbName = colStats.getStatsDesc().getDbName(); String tableName = colStats.getStatsDesc().getTableName(); List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); @@ -1408,7 +1564,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), tbl); } finally { tableCacheLock.readLock().unlock(); @@ -1419,7 +1575,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), statsObjs); } finally { tableColStatsCacheLock.readLock().unlock(); @@ -1431,13 +1587,17 @@ public class CachedStore implements RawStore, Configurable { @Override public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, List<String> colNames) throws MetaException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTableColumnStatistics(dbName, tableName, colNames); + } ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); for (String colName : colNames) { String colStatsCacheKey = CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), colName); - ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey); + ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); if (colStat != null) { colStatObjs.add(colStat); } @@ -1454,11 +1614,13 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), colName); } finally { tableColStatsCacheLock.readLock().unlock(); @@ -1472,6 +1634,8 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; String dbName = colStats.getStatsDesc().getDbName(); String tableName = colStats.getStatsDesc().getTableName(); List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); @@ -1487,7 +1651,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), partVals, part); } finally { partitionCacheLock.readLock().unlock(); @@ -1498,7 +1662,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.updatePartitionColStatsInCache( + sharedCache.updatePartitionColStatsInCache( HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj()); @@ -1524,11 +1688,13 @@ public class CachedStore implements RawStore, Configurable { boolean succ = rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); if (succ) { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return true; try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), partVals, colName); } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1538,17 +1704,21 @@ public class CachedStore implements RawStore, Configurable { } @Override - public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames, - List<String> colNames) throws MetaException, NoSuchObjectException { - List<ColumnStatisticsObj> colStats = mergeColStatsForPartitions( - HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), - partNames, colNames); - return new AggrStats(colStats, partNames.size()); - - } + public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames, + List<String> colNames) throws MetaException, NoSuchObjectException { + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } + List<ColumnStatisticsObj> colStats = mergeColStatsForPartitions( + HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), + partNames, colNames, sharedCache); + return new AggrStats(colStats, partNames.size()); + } private List<ColumnStatisticsObj> mergeColStatsForPartitions(String dbName, String tblName, - List<String> partNames, List<String> colNames) throws MetaException { + List<String> partNames, List<String> colNames, SharedCache sharedCache) + throws MetaException { final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); final double ndvTuner = HiveConf.getFloatVar(getConf(), @@ -1561,7 +1731,7 @@ public class CachedStore implements RawStore, Configurable { String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); List<ColumnStatisticsObj> colStat = new ArrayList<>(); - ColumnStatisticsObj colStatsForPart = SharedCache + ColumnStatisticsObj colStatsForPart = sharedCache .getCachedPartitionColStats(colStatsCacheKey); if (colStatsForPart != null) { colStat.add(colStatsForPart); @@ -1649,6 +1819,8 @@ public class CachedStore implements RawStore, Configurable { public void dropPartitions(String dbName, String tblName, List<String> partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return; // Remove partitions try { // Wait if background cache update is happening @@ -1656,7 +1828,7 @@ public class CachedStore implements RawStore, Configurable { isPartitionCacheDirty.set(true); for (String partName : partNames) { List<String> vals = partNameToVals(partName); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), vals); } } finally { @@ -1669,7 +1841,7 @@ public class CachedStore implements RawStore, Configurable { isPartitionColStatsCacheDirty.set(true); for (String partName : partNames) { List<String> part_vals = partNameToVals(partName); - SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + sharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), part_vals); } } finally { @@ -1843,17 +2015,29 @@ public class CachedStore implements RawStore, Configurable { @Override public int getTableCount() throws MetaException { - return SharedCache.getCachedTableCount(); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getTableCount(); + } + return sharedCache.getCachedTableCount(); } @Override public int getPartitionCount() throws MetaException { - return SharedCache.getCachedPartitionCount(); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getPartitionCount(); + } + return sharedCache.getCachedPartitionCount(); } @Override public int getDatabaseCount() throws MetaException { - return SharedCache.getCachedDatabaseCount(); + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return rawStore.getDatabaseCount(); + } + return sharedCache.getCachedDatabaseCount(); } @Override @@ -1894,7 +2078,9 @@ public class CachedStore implements RawStore, Configurable { // TODO constraintCache List<String> constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints); - SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) return constraintNames; + sharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); return constraintNames; } @@ -1953,4 +2139,103 @@ public class CachedStore implements RawStore, Configurable { public String getMetastoreDbUuid() throws MetaException { return rawStore.getMetastoreDbUuid(); } + + // TODO: this is only used to hide SharedCache instance from direct use; ideally, the stuff in + // CachedStore that's specific to SharedCache (e.g. update threads) should be refactored to + // be part of this, then this could be moved out of this file (or merged with SharedCache). + private static final class SharedCacheWrapper { + private static enum InitState { + NOT_ENABLED, INITIALIZING, INITIALIZED, FAILED_FATAL + } + + private final SharedCache instance = new SharedCache(); + private final Object initLock = new Object(); + private InitState initState = InitState.NOT_ENABLED; + // We preserve the old setConf init behavior, where a failed prewarm would fail the query + // and give a chance to another query to try prewarming again. Basically, we'd increment the + // count and all the queries waiting for prewarm would fail; however, we will retry the prewarm + // again infinitely, so some queries might succeed later. + private int initFailureCount; + private Throwable lastError; + + /** + * A callback to updates the initialization state. + * @param error Error, if any. Null means the initialization has succeeded. + * @param isFatal Whether the error (if present) is fatal, or whether init will be retried. + */ + void updateInitState(Throwable error, boolean isFatal) { + boolean isSuccessful = error == null; + synchronized (initLock) { + if (isSuccessful) { + initState = InitState.INITIALIZED; + } else if (isFatal) { + initState = InitState.FAILED_FATAL; + lastError = error; + } else { + ++initFailureCount; + lastError = error; + } + initLock.notifyAll(); + } + } + + void startInit(HiveConf conf) { + LOG.info("Initializing shared cache"); + synchronized (initLock) { + assert initState == InitState.NOT_ENABLED; + initState = InitState.INITIALIZING; + } + // The first iteration of the update thread will prewarm the cache. + startCacheUpdateService(conf); + } + + /** + * Gets the SharedCache, waiting for initialization to happen if necessary. + * Fails on any initialization error, even if the init will be retried later. + */ + public SharedCache get() throws MetaException { + if (!waitForInit()) return null; + return instance; + } + + /** Gets the shared cache unsafely (may not be ready to use); used by init methods. */ + SharedCache getUnsafe() { + return instance; + } + + private boolean waitForInit() throws MetaException { + synchronized (initLock) { + int localFailureCount = initFailureCount; + while (true) { + switch (initState) { + case INITIALIZED: return true; + case NOT_ENABLED: return false; + case FAILED_FATAL: { + throw new RuntimeException("CachedStore prewarm had a fatal error", lastError); + } + case INITIALIZING: { + try { + initLock.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MetaException("Interrupted"); + } + break; + } + default: throw new AssertionError(initState); + } + // Fail if any errors occured; mimicks the old behavior where a setConf prewarm failure + // would fail the current task, but cause the next setConf to try prewarm again forever. + if (initFailureCount != localFailureCount) { + throw new RuntimeException("CachedStore prewarm failed", lastError); + } + } + } + } + } + + @VisibleForTesting + void setInitializedForTest() { + sharedCacheWrapper.updateInitState(null, false); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/842d4dfc/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 80b17e0..e713de0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -48,19 +48,15 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; public class SharedCache { - private static Map<String, Database> databaseCache = new TreeMap<String, Database>(); - private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>(); - private static Map<String, PartitionWrapper> partitionCache = - new TreeMap<String, PartitionWrapper>(); - private static Map<String, ColumnStatisticsObj> partitionColStatsCache = - new TreeMap<String, ColumnStatisticsObj>(); - private static Map<String, ColumnStatisticsObj> tableColStatsCache = - new TreeMap<String, ColumnStatisticsObj>(); - private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = - new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>(); + private Map<String, Database> databaseCache = new TreeMap<String, Database>(); + private Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>(); + private Map<String, PartitionWrapper> partitionCache = new TreeMap<>(); + private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>(); + private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>(); + private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>(); private static MessageDigest md; - static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class); static { try { @@ -70,43 +66,43 @@ public class SharedCache { } } - public static synchronized Database getDatabaseFromCache(String name) { + public synchronized Database getDatabaseFromCache(String name) { return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; } - public static synchronized void addDatabaseToCache(String dbName, Database db) { + public synchronized void addDatabaseToCache(String dbName, Database db) { Database dbCopy = db.deepCopy(); dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName)); databaseCache.put(dbName, dbCopy); } - public static synchronized void removeDatabaseFromCache(String dbName) { + public synchronized void removeDatabaseFromCache(String dbName) { databaseCache.remove(dbName); } - public static synchronized List<String> listCachedDatabases() { + public synchronized List<String> listCachedDatabases() { return new ArrayList<String>(databaseCache.keySet()); } - public static synchronized void alterDatabaseInCache(String dbName, Database newDb) { + public synchronized void alterDatabaseInCache(String dbName, Database newDb) { removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); } - public static synchronized int getCachedDatabaseCount() { + public synchronized int getCachedDatabaseCount() { return databaseCache.size(); } - public static synchronized Table getTableFromCache(String dbName, String tableName) { + public synchronized Table getTableFromCache(String dbName, String tableName) { TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); if (tblWrapper == null) { return null; } - Table t = CacheUtils.assemble(tblWrapper); + Table t = CacheUtils.assemble(tblWrapper, this); return t; } - public static synchronized void addTableToCache(String dbName, String tblName, Table tbl) { + public synchronized void addTableToCache(String dbName, String tblName, Table tbl) { Table tblCopy = tbl.deepCopy(); tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName)); tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName)); @@ -128,7 +124,7 @@ public class SharedCache { tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); } - public static synchronized void removeTableFromCache(String dbName, String tblName) { + public synchronized void removeTableFromCache(String dbName, String tblName) { TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); byte[] sdHash = tblWrapper.getSdHash(); if (sdHash!=null) { @@ -136,11 +132,11 @@ public class SharedCache { } } - public static synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { + public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null; } - public static synchronized void removeTableColStatsFromCache(String dbName, String tblName) { + public synchronized void removeTableColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); Iterator<Entry<String, ColumnStatisticsObj>> iterator = tableColStatsCache.entrySet().iterator(); @@ -153,12 +149,12 @@ public class SharedCache { } } - public static synchronized void removeTableColStatsFromCache(String dbName, String tblName, + public synchronized void removeTableColStatsFromCache(String dbName, String tblName, String colName) { tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); } - public static synchronized void updateTableColStatsInCache(String dbName, String tableName, + public synchronized void updateTableColStatsInCache(String dbName, String tableName, List<ColumnStatisticsObj> colStatsForTable) { for (ColumnStatisticsObj colStatObj : colStatsForTable) { // Get old stats object if present @@ -176,13 +172,13 @@ public class SharedCache { } } - public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { + public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { removeTableFromCache(dbName, tblName); addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable); } - public static synchronized void alterTableInPartitionCache(String dbName, String tblName, + public synchronized void alterTableInPartitionCache(String dbName, String tblName, Table newTable) { if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); @@ -196,7 +192,7 @@ public class SharedCache { } } - public static synchronized void alterTableInTableColStatsCache(String dbName, String tblName, + public synchronized void alterTableInTableColStatsCache(String dbName, String tblName, Table newTable) { if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); @@ -219,7 +215,7 @@ public class SharedCache { } } - public static synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, + public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, Table newTable) { if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); @@ -248,21 +244,21 @@ public class SharedCache { } } - public static synchronized int getCachedTableCount() { + public synchronized int getCachedTableCount() { return tableCache.size(); } - public static synchronized List<Table> listCachedTables(String dbName) { + public synchronized List<Table> listCachedTables(String dbName) { List<Table> tables = new ArrayList<Table>(); for (TableWrapper wrapper : tableCache.values()) { if (wrapper.getTable().getDbName().equals(dbName)) { - tables.add(CacheUtils.assemble(wrapper)); + tables.add(CacheUtils.assemble(wrapper, this)); } } return tables; } - public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) { + public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) { List<TableMeta> tableMetas = new ArrayList<TableMeta>(); for (String dbName : listCachedDatabases()) { if (CacheUtils.matches(dbName, dbNames)) { @@ -281,7 +277,7 @@ public class SharedCache { return tableMetas; } - public static synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { + public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { Partition partCopy = part.deepCopy(); PartitionWrapper wrapper; if (part.getSd()!=null) { @@ -296,24 +292,24 @@ public class SharedCache { partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); } - public static synchronized Partition getPartitionFromCache(String key) { + public synchronized Partition getPartitionFromCache(String key) { PartitionWrapper wrapper = partitionCache.get(key); if (wrapper == null) { return null; } - Partition p = CacheUtils.assemble(wrapper); + Partition p = CacheUtils.assemble(wrapper, this); return p; } - public static synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) { + public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) { return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); } - public static synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) { + public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) { return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); } - public static synchronized Partition removePartitionFromCache(String dbName, String tblName, + public synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) { PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); @@ -324,7 +320,7 @@ public class SharedCache { } // Remove cached column stats for all partitions of a table - public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { + public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); Iterator<Entry<String, ColumnStatisticsObj>> iterator = partitionColStatsCache.entrySet().iterator(); @@ -338,7 +334,7 @@ public class SharedCache { } // Remove cached column stats for a particular partition of a table - public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, List<String> partVals) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); Iterator<Entry<String, ColumnStatisticsObj>> iterator = @@ -353,33 +349,33 @@ public class SharedCache { } // Remove cached column stats for a particular partition and a particular column of a table - public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, List<String> partVals, String colName) { partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); } - public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) { + public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) { List<Partition> partitions = new ArrayList<Partition>(); int count = 0; for (PartitionWrapper wrapper : partitionCache.values()) { if (wrapper.getPartition().getDbName().equals(dbName) && wrapper.getPartition().getTableName().equals(tblName) && (max == -1 || count < max)) { - partitions.add(CacheUtils.assemble(wrapper)); + partitions.add(CacheUtils.assemble(wrapper, this)); count++; } } return partitions; } - public static synchronized void alterPartitionInCache(String dbName, String tblName, + public synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) { removePartitionFromCache(dbName, tblName, partVals); addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart); } - public static synchronized void alterPartitionInColStatsCache(String dbName, String tblName, + public synchronized void alterPartitionInColStatsCache(String dbName, String tblName, List<String> partVals, Partition newPart) { String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); Map<String, ColumnStatisticsObj> newPartitionColStats = @@ -403,7 +399,7 @@ public class SharedCache { partitionColStatsCache.putAll(newPartitionColStats); } - public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName, + public synchronized void updatePartitionColStatsInCache(String dbName, String tableName, List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) { for (ColumnStatisticsObj colStatObj : colStatsObjs) { // Get old stats object if present @@ -421,15 +417,15 @@ public class SharedCache { } } - public static synchronized int getCachedPartitionCount() { + public synchronized int getCachedPartitionCount() { return partitionCache.size(); } - public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { + public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; } - public static synchronized void addPartitionColStatsToCache(String dbName, String tableName, + public synchronized void addPartitionColStatsToCache(String dbName, String tableName, Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) { for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) { String partName = entry.getKey(); @@ -445,7 +441,7 @@ public class SharedCache { } } - public static synchronized void refreshPartitionColStats(String dbName, String tableName, + public synchronized void refreshPartitionColStats(String dbName, String tableName, Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) { LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName + " and table: " + tableName); @@ -453,7 +449,7 @@ public class SharedCache { addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); } - public static synchronized void addTableColStatsToCache(String dbName, String tableName, + public synchronized void addTableColStatsToCache(String dbName, String tableName, List<ColumnStatisticsObj> colStatsForTable) { for (ColumnStatisticsObj colStatObj : colStatsForTable) { String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); @@ -461,7 +457,7 @@ public class SharedCache { } } - public static synchronized void refreshTableColStats(String dbName, String tableName, + public synchronized void refreshTableColStats(String dbName, String tableName, List<ColumnStatisticsObj> colStatsForTable) { LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName + " and table: " + tableName); @@ -471,7 +467,7 @@ public class SharedCache { addTableColStatsToCache(dbName, tableName, colStatsForTable); } - public static void increSd(StorageDescriptor sd, byte[] sdHash) { + public void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); if (sdCache.containsKey(byteArray)) { sdCache.get(byteArray).refCount++; @@ -483,7 +479,7 @@ public class SharedCache { } } - public static void decrSd(byte[] sdHash) { + public void decrSd(byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray); sdWrapper.refCount--; @@ -492,13 +488,13 @@ public class SharedCache { } } - public static StorageDescriptor getSdFromCache(byte[] sdHash) { + public StorageDescriptor getSdFromCache(byte[] sdHash) { StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash)); return sdWrapper.getSd(); } // Replace databases in databaseCache with the new list - public static synchronized void refreshDatabases(List<Database> databases) { + public synchronized void refreshDatabases(List<Database> databases) { LOG.debug("CachedStore: updating cached database objects"); for (String dbName : listCachedDatabases()) { removeDatabaseFromCache(dbName); @@ -509,7 +505,7 @@ public class SharedCache { } // Replace tables in tableCache with the new list - public static synchronized void refreshTables(String dbName, List<Table> tables) { + public synchronized void refreshTables(String dbName, List<Table> tables) { LOG.debug("CachedStore: updating cached table objects for database: " + dbName); for (Table tbl : listCachedTables(dbName)) { removeTableFromCache(dbName, tbl.getTableName()); @@ -519,7 +515,7 @@ public class SharedCache { } } - public static synchronized void refreshPartitions(String dbName, String tblName, + public synchronized void refreshPartitions(String dbName, String tblName, List<Partition> partitions) { LOG.debug("CachedStore: updating cached partition objects for database: " + dbName + " and table: " + tblName); @@ -537,27 +533,27 @@ public class SharedCache { } @VisibleForTesting - static Map<String, Database> getDatabaseCache() { + Map<String, Database> getDatabaseCache() { return databaseCache; } @VisibleForTesting - static Map<String, TableWrapper> getTableCache() { + Map<String, TableWrapper> getTableCache() { return tableCache; } @VisibleForTesting - static Map<String, PartitionWrapper> getPartitionCache() { + Map<String, PartitionWrapper> getPartitionCache() { return partitionCache; } @VisibleForTesting - static Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() { + Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() { return sdCache; } @VisibleForTesting - static Map<String, ColumnStatisticsObj> getPartitionColStatsCache() { + Map<String, ColumnStatisticsObj> getPartitionColStatsCache() { return partitionColStatsCache; } }
