[
https://issues.apache.org/jira/browse/HIVE-23949?focusedWorklogId=464372&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-464372
]
ASF GitHub Bot logged work on HIVE-23949:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jul/20 07:34
Start Date: 30/Jul/20 07:34
Worklog Time Spent: 10m
Work Description: jcamachor commented on a change in pull request #1317:
URL: https://github.com/apache/hive/pull/1317#discussion_r462584582
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3665,33 +3678,39 @@ public boolean dropPartition(String dbName, String
tableName, List<String> parti
* @return list of partition objects
*/
public List<Partition> getPartitions(Table tbl) throws HiveException {
- if (tbl.isPartitioned()) {
- List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
- try {
- GetPartitionsPsWithAuthRequest req = new
GetPartitionsPsWithAuthRequest();
- req.setTblName(tbl.getTableName());
- req.setDbName(tbl.getDbName());
- req.setUserName(getUserName());
- req.setMaxParts((short) -1);
- req.setGroupNames(getGroupNames());
- if (AcidUtils.isTransactionalTable(tbl)) {
- ValidWriteIdList validWriteIdList =
getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
- req.setValidWriteIdList(validWriteIdList != null ?
validWriteIdList.toString() : null);
- }
- GetPartitionsPsWithAuthResponse res =
getMSC().listPartitionsWithAuthInfoRequest(req);
- tParts = res.getPartitions();
+ long t1 = System.nanoTime();
+ try {
+ if (tbl.isPartitioned()) {
+ List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
+ try {
+ GetPartitionsPsWithAuthRequest req = new
GetPartitionsPsWithAuthRequest();
+ req.setTblName(tbl.getTableName());
+ req.setDbName(tbl.getDbName());
+ req.setUserName(getUserName());
+ req.setMaxParts((short) -1);
+ req.setGroupNames(getGroupNames());
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ ValidWriteIdList validWriteIdList =
getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
+ req.setValidWriteIdList(validWriteIdList != null ?
validWriteIdList.toString() : null);
+ }
+ GetPartitionsPsWithAuthResponse res =
getMSC().listPartitionsWithAuthInfoRequest(req);
+ tParts = res.getPartitions();
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
- }
- List<Partition> parts = new ArrayList<>(tParts.size());
- for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
- parts.add(new Partition(tbl, tpart));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ List<Partition> parts = new ArrayList<>(tParts.size());
+ for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
+ parts.add(new Partition(tbl, tpart));
+ }
+ return parts;
+ } else {
+ return Collections.singletonList(new Partition(tbl));
}
- return parts;
- } else {
- return Collections.singletonList(new Partition(tbl));
+ } finally {
Review comment:
I did not catch this comment in the review. I was taking a look at the
code and I think there are two problems with current approach (sorry about the
back and forth).
1) Currently we are logging information twice (PerfLogger already has its
own logging mechanism when logging is set to debug level). We should remove the
logging lines outside of it.
2) I think we should have left the PerfLogEnd in the `finally` block. I saw
in some methods that we catch an exception and ignore it, which could lead to
leaks since we do not call PerfLogEnd. Even if currently this may not be a
problem, it would be difficult to verify that nothing changes in the future
there. I think the safest path is just to make that call in the `finally` block
to make sure it is always called.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3665,33 +3678,39 @@ public boolean dropPartition(String dbName, String
tableName, List<String> parti
* @return list of partition objects
*/
public List<Partition> getPartitions(Table tbl) throws HiveException {
- if (tbl.isPartitioned()) {
- List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
- try {
- GetPartitionsPsWithAuthRequest req = new
GetPartitionsPsWithAuthRequest();
- req.setTblName(tbl.getTableName());
- req.setDbName(tbl.getDbName());
- req.setUserName(getUserName());
- req.setMaxParts((short) -1);
- req.setGroupNames(getGroupNames());
- if (AcidUtils.isTransactionalTable(tbl)) {
- ValidWriteIdList validWriteIdList =
getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
- req.setValidWriteIdList(validWriteIdList != null ?
validWriteIdList.toString() : null);
- }
- GetPartitionsPsWithAuthResponse res =
getMSC().listPartitionsWithAuthInfoRequest(req);
- tParts = res.getPartitions();
+ long t1 = System.nanoTime();
+ try {
+ if (tbl.isPartitioned()) {
+ List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
+ try {
+ GetPartitionsPsWithAuthRequest req = new
GetPartitionsPsWithAuthRequest();
+ req.setTblName(tbl.getTableName());
+ req.setDbName(tbl.getDbName());
+ req.setUserName(getUserName());
+ req.setMaxParts((short) -1);
+ req.setGroupNames(getGroupNames());
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ ValidWriteIdList validWriteIdList =
getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
+ req.setValidWriteIdList(validWriteIdList != null ?
validWriteIdList.toString() : null);
+ }
+ GetPartitionsPsWithAuthResponse res =
getMSC().listPartitionsWithAuthInfoRequest(req);
+ tParts = res.getPartitions();
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
- }
- List<Partition> parts = new ArrayList<>(tParts.size());
- for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
- parts.add(new Partition(tbl, tpart));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ List<Partition> parts = new ArrayList<>(tParts.size());
+ for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
+ parts.add(new Partition(tbl, tpart));
+ }
+ return parts;
+ } else {
+ return Collections.singletonList(new Partition(tbl));
}
- return parts;
- } else {
- return Collections.singletonList(new Partition(tbl));
+ } finally {
Review comment:
I did not catch this comment in the review. I was taking a look at the
code and I think there are two problems with current approach (sorry about the
back and forth).
1) Currently we are logging information twice (PerfLogger already has its
own logging mechanism when logging is set to debug level). We should remove the
logging lines outside of it.
2) I think we should have left the PerfLogEnd in the `finally` block. I saw
in some methods that we catch an exception and ignore it, which could lead to
leaks since we do not call PerfLogEnd. Even if currently this may not be a
problem, it would be difficult to verify that nothing changes in the future
there. I think the safest path is just to make that call in the `finally` block
to make sure it is always called.
Thoughts?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3665,33 +3678,39 @@ public boolean dropPartition(String dbName, String
tableName, List<String> parti
* @return list of partition objects
*/
public List<Partition> getPartitions(Table tbl) throws HiveException {
- if (tbl.isPartitioned()) {
- List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
- try {
- GetPartitionsPsWithAuthRequest req = new
GetPartitionsPsWithAuthRequest();
- req.setTblName(tbl.getTableName());
- req.setDbName(tbl.getDbName());
- req.setUserName(getUserName());
- req.setMaxParts((short) -1);
- req.setGroupNames(getGroupNames());
- if (AcidUtils.isTransactionalTable(tbl)) {
- ValidWriteIdList validWriteIdList =
getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
- req.setValidWriteIdList(validWriteIdList != null ?
validWriteIdList.toString() : null);
- }
- GetPartitionsPsWithAuthResponse res =
getMSC().listPartitionsWithAuthInfoRequest(req);
- tParts = res.getPartitions();
+ long t1 = System.nanoTime();
+ try {
+ if (tbl.isPartitioned()) {
+ List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
+ try {
+ GetPartitionsPsWithAuthRequest req = new
GetPartitionsPsWithAuthRequest();
+ req.setTblName(tbl.getTableName());
+ req.setDbName(tbl.getDbName());
+ req.setUserName(getUserName());
+ req.setMaxParts((short) -1);
+ req.setGroupNames(getGroupNames());
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ ValidWriteIdList validWriteIdList =
getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
+ req.setValidWriteIdList(validWriteIdList != null ?
validWriteIdList.toString() : null);
+ }
+ GetPartitionsPsWithAuthResponse res =
getMSC().listPartitionsWithAuthInfoRequest(req);
+ tParts = res.getPartitions();
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
- }
- List<Partition> parts = new ArrayList<>(tParts.size());
- for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
- parts.add(new Partition(tbl, tpart));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ List<Partition> parts = new ArrayList<>(tParts.size());
+ for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
+ parts.add(new Partition(tbl, tpart));
+ }
+ return parts;
+ } else {
+ return Collections.singletonList(new Partition(tbl));
}
- return parts;
- } else {
- return Collections.singletonList(new Partition(tbl));
+ } finally {
Review comment:
I just checked the code and for 2., there are quite a few occurrences. I
think we may at least move it to `finally` for the methods where we already
catching exceptions. For others, we can create a follow-up to explore that,
since that may be a more widespread issue that affects other code paths where
PerfLogger is used too.
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+ private static Cache<CacheKey, Object> mscLocalCache = null;
+ //TODO: initialize in the init method
Review comment:
nit. This line can be removed?
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -487,6 +487,13 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
* in the underlying Hadoop configuration.
*/
public static enum ConfVars {
+ MSC_CACHE_ENABLED("hive.metastore.client.cache.enabled", true,
+ "This property enables a Caffeiene Cache for Metastore client"),
+ MSC_CACHE_MAX_SIZE("hive.metastore.client.cache.maxSize", "1Gb", new
SizeValidator(),
+ "Set the maximum size (number of bytes) of the metastore client
cache (DEFAULT: 1GB). " +
+ "Only in effect when the cache is enabled"),
+ MSC_CACHE_RECORD_STATS("hive.metastore.client.cache.recordStats", true,
Review comment:
The default for this property should be `false` (you can set it to
`true` in tests `hive-site` files though).
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+ private static Cache<CacheKey, Object> mscLocalCache = null;
+ //TODO: initialize in the init method
+ private static boolean IS_CACHE_ENABLED;
+ private static long MAX_SIZE;
+ private static boolean RECORD_STATS;
+ private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+ private static String cacheObjName = null;
+
+ public static synchronized void init() {
+ if (mscLocalCache != null) return; // init cache only once
+ Configuration metaConf = MetastoreConf.newMetastoreConf();
+ LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+ MAX_SIZE = MetastoreConf.getSizeVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+ IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+ RECORD_STATS = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+ initSizeEstimator();
+ initCache();
+ LOG.debug("Local cache initialized in HiveMetaStoreClient: " +
mscLocalCache);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf) throws
MetaException {
+ this(conf, null, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader) throws MetaException {
+ this(conf, hookLoader, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ private static void initSizeEstimator() {
+ sizeEstimator = new HashMap<>();
+ IncrementalObjectSizeEstimator.createEstimators(CacheKey.class,
sizeEstimator);
+ Arrays.stream(KeyType.values()).forEach(e -> {
+ IncrementalObjectSizeEstimator.createEstimators(e.keyClass,
sizeEstimator);
+ IncrementalObjectSizeEstimator.createEstimators(e.valueClass,
sizeEstimator);}
+ );
+ }
+
+ /**
+ * KeyType is used to differentiate the request types. More types can be
added in future.
+ */
+ public enum KeyType {
+ PARTITIONS_BY_EXPR(PartitionsByExprRequest.class,
PartitionsByExprResult.class),
+ PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class,
PartitionsSpecByExprResult.class);
+
+ private final Class<?> keyClass;
+ private final Class<?> valueClass;
+
+ KeyType(Class<?> keyClass, Class<?> valueClass) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+ }
+
+ /**
+ * CacheKey objects are used as key for the cache.
+ */
+ public static class CacheKey{
+ KeyType IDENTIFIER;
+ Object obj;
+
+ public CacheKey(KeyType IDENTIFIER, Object obj) {
+ this.IDENTIFIER = IDENTIFIER;
+ this.obj = obj;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return IDENTIFIER == cacheKey.IDENTIFIER &&
+ Objects.equals(obj, cacheKey.obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(IDENTIFIER, obj);
+ }
+ }
+
+ private static int getWeight(CacheKey key, Object val) {
+ ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+ ObjectEstimator valSizeEstimator =
sizeEstimator.get(key.IDENTIFIER.valueClass);
+ int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+ int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize,
valSize, keySize + valSize);
+ }
+ return keySize + valSize;
+ }
+
+ private Object load(CacheKey key) {
+ try {
+ return getResultObject(key);
+ } catch (TException e) {
+ throw new UncheckedCacheException(e);
+ }
+ }
+
+/**
+ * Initializes the cache
+ */
+ private static void initCache() {
+ int initSize = 100;
+ Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+ .initialCapacity(initSize)
+ .maximumWeight(MAX_SIZE)
+ .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+ .removalListener((key, val, cause) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val,
cause);
+ }});
+ if (RECORD_STATS) {
+ cacheBuilder.recordStats();
+ }
+ mscLocalCache = cacheBuilder.build();
+ cacheObjName =
mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+ }
+
+ /**
+ * This method is used to load the cache by calling relevant APIs, depending
on the type of the request.
+ *
+ * @param cacheKey key of the cache, containing an identifier and a request
object
+ * @return Result object / null
+ * @throws TException
+ */
+ private Object getResultObject(CacheKey cacheKey) throws TException {
+ Object result = null;
+
+ switch (cacheKey.IDENTIFIER) {
+ case PARTITIONS_BY_EXPR:
+ result =
super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ case PARTITIONS_SPEC_BY_EXPR:
+ result =
super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ default:
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected PartitionsByExprResult
getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+ try {
+ r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load);
// get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
+ throw new TException(e.getCause());
+ }
+ }
+ } else {
+ r = client.get_partitions_by_expr(req);
Review comment:
nit. whitespace
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+ private static Cache<CacheKey, Object> mscLocalCache = null;
+ //TODO: initialize in the init method
+ private static boolean IS_CACHE_ENABLED;
+ private static long MAX_SIZE;
+ private static boolean RECORD_STATS;
+ private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+ private static String cacheObjName = null;
+
+ public static synchronized void init() {
+ if (mscLocalCache != null) return; // init cache only once
+ Configuration metaConf = MetastoreConf.newMetastoreConf();
+ LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+ MAX_SIZE = MetastoreConf.getSizeVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+ IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+ RECORD_STATS = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+ initSizeEstimator();
+ initCache();
+ LOG.debug("Local cache initialized in HiveMetaStoreClient: " +
mscLocalCache);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf) throws
MetaException {
+ this(conf, null, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader) throws MetaException {
+ this(conf, hookLoader, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ private static void initSizeEstimator() {
+ sizeEstimator = new HashMap<>();
+ IncrementalObjectSizeEstimator.createEstimators(CacheKey.class,
sizeEstimator);
+ Arrays.stream(KeyType.values()).forEach(e -> {
+ IncrementalObjectSizeEstimator.createEstimators(e.keyClass,
sizeEstimator);
+ IncrementalObjectSizeEstimator.createEstimators(e.valueClass,
sizeEstimator);}
+ );
+ }
+
+ /**
+ * KeyType is used to differentiate the request types. More types can be
added in future.
+ */
+ public enum KeyType {
+ PARTITIONS_BY_EXPR(PartitionsByExprRequest.class,
PartitionsByExprResult.class),
+ PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class,
PartitionsSpecByExprResult.class);
+
+ private final Class<?> keyClass;
+ private final Class<?> valueClass;
+
+ KeyType(Class<?> keyClass, Class<?> valueClass) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+ }
+
+ /**
+ * CacheKey objects are used as key for the cache.
+ */
+ public static class CacheKey{
+ KeyType IDENTIFIER;
+ Object obj;
+
+ public CacheKey(KeyType IDENTIFIER, Object obj) {
+ this.IDENTIFIER = IDENTIFIER;
+ this.obj = obj;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return IDENTIFIER == cacheKey.IDENTIFIER &&
+ Objects.equals(obj, cacheKey.obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(IDENTIFIER, obj);
+ }
+ }
+
+ private static int getWeight(CacheKey key, Object val) {
+ ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+ ObjectEstimator valSizeEstimator =
sizeEstimator.get(key.IDENTIFIER.valueClass);
+ int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+ int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize,
valSize, keySize + valSize);
+ }
+ return keySize + valSize;
+ }
+
+ private Object load(CacheKey key) {
+ try {
+ return getResultObject(key);
+ } catch (TException e) {
+ throw new UncheckedCacheException(e);
+ }
+ }
+
+/**
+ * Initializes the cache
+ */
+ private static void initCache() {
+ int initSize = 100;
+ Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+ .initialCapacity(initSize)
+ .maximumWeight(MAX_SIZE)
+ .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+ .removalListener((key, val, cause) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val,
cause);
+ }});
+ if (RECORD_STATS) {
+ cacheBuilder.recordStats();
+ }
+ mscLocalCache = cacheBuilder.build();
+ cacheObjName =
mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+ }
+
+ /**
+ * This method is used to load the cache by calling relevant APIs, depending
on the type of the request.
+ *
+ * @param cacheKey key of the cache, containing an identifier and a request
object
+ * @return Result object / null
+ * @throws TException
+ */
+ private Object getResultObject(CacheKey cacheKey) throws TException {
+ Object result = null;
+
+ switch (cacheKey.IDENTIFIER) {
+ case PARTITIONS_BY_EXPR:
+ result =
super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ case PARTITIONS_SPEC_BY_EXPR:
+ result =
super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ default:
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected PartitionsByExprResult
getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+ try {
+ r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load);
// get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
Review comment:
nit. Maybe we could check whether it is already a `TException` so we do
not wrap it around a second one.
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+ private static Cache<CacheKey, Object> mscLocalCache = null;
+ //TODO: initialize in the init method
+ private static boolean IS_CACHE_ENABLED;
+ private static long MAX_SIZE;
+ private static boolean RECORD_STATS;
+ private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+ private static String cacheObjName = null;
+
+ public static synchronized void init() {
+ if (mscLocalCache != null) return; // init cache only once
+ Configuration metaConf = MetastoreConf.newMetastoreConf();
+ LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+ MAX_SIZE = MetastoreConf.getSizeVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+ IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+ RECORD_STATS = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+ initSizeEstimator();
+ initCache();
+ LOG.debug("Local cache initialized in HiveMetaStoreClient: " +
mscLocalCache);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf) throws
MetaException {
+ this(conf, null, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader) throws MetaException {
+ this(conf, hookLoader, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ private static void initSizeEstimator() {
+ sizeEstimator = new HashMap<>();
+ IncrementalObjectSizeEstimator.createEstimators(CacheKey.class,
sizeEstimator);
+ Arrays.stream(KeyType.values()).forEach(e -> {
+ IncrementalObjectSizeEstimator.createEstimators(e.keyClass,
sizeEstimator);
+ IncrementalObjectSizeEstimator.createEstimators(e.valueClass,
sizeEstimator);}
+ );
+ }
+
+ /**
+ * KeyType is used to differentiate the request types. More types can be
added in future.
+ */
+ public enum KeyType {
+ PARTITIONS_BY_EXPR(PartitionsByExprRequest.class,
PartitionsByExprResult.class),
+ PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class,
PartitionsSpecByExprResult.class);
+
+ private final Class<?> keyClass;
+ private final Class<?> valueClass;
+
+ KeyType(Class<?> keyClass, Class<?> valueClass) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+ }
+
+ /**
+ * CacheKey objects are used as key for the cache.
+ */
+ public static class CacheKey{
+ KeyType IDENTIFIER;
+ Object obj;
+
+ public CacheKey(KeyType IDENTIFIER, Object obj) {
+ this.IDENTIFIER = IDENTIFIER;
+ this.obj = obj;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return IDENTIFIER == cacheKey.IDENTIFIER &&
+ Objects.equals(obj, cacheKey.obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(IDENTIFIER, obj);
+ }
+ }
+
+ private static int getWeight(CacheKey key, Object val) {
+ ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+ ObjectEstimator valSizeEstimator =
sizeEstimator.get(key.IDENTIFIER.valueClass);
+ int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+ int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize,
valSize, keySize + valSize);
+ }
+ return keySize + valSize;
+ }
+
+ private Object load(CacheKey key) {
+ try {
+ return getResultObject(key);
+ } catch (TException e) {
+ throw new UncheckedCacheException(e);
+ }
+ }
+
+/**
+ * Initializes the cache
+ */
+ private static void initCache() {
+ int initSize = 100;
+ Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+ .initialCapacity(initSize)
+ .maximumWeight(MAX_SIZE)
+ .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+ .removalListener((key, val, cause) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val,
cause);
+ }});
+ if (RECORD_STATS) {
+ cacheBuilder.recordStats();
+ }
+ mscLocalCache = cacheBuilder.build();
+ cacheObjName =
mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+ }
+
+ /**
+ * This method is used to load the cache by calling relevant APIs, depending
on the type of the request.
+ *
+ * @param cacheKey key of the cache, containing an identifier and a request
object
+ * @return Result object / null
+ * @throws TException
+ */
+ private Object getResultObject(CacheKey cacheKey) throws TException {
+ Object result = null;
+
+ switch (cacheKey.IDENTIFIER) {
+ case PARTITIONS_BY_EXPR:
+ result =
super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ case PARTITIONS_SPEC_BY_EXPR:
+ result =
super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ default:
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected PartitionsByExprResult
getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+ try {
+ r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load);
// get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
+ throw new TException(e.getCause());
+ }
+ }
+ } else {
+ r = client.get_partitions_by_expr(req);
+ }
+
+ return r;
+ }
+
+ @Override
+ protected PartitionsSpecByExprResult
getPartitionsSpecByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsSpecByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_SPEC_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, req);
+ try {
+ r = (PartitionsSpecByExprResult) mscLocalCache.get(cacheKey,
this::load); // get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
Review comment:
nit. Same comment as above (TException).
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+ private static Cache<CacheKey, Object> mscLocalCache = null;
+ //TODO: initialize in the init method
+ private static boolean IS_CACHE_ENABLED;
+ private static long MAX_SIZE;
+ private static boolean RECORD_STATS;
+ private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+ private static String cacheObjName = null;
+
+ public static synchronized void init() {
+ if (mscLocalCache != null) return; // init cache only once
+ Configuration metaConf = MetastoreConf.newMetastoreConf();
+ LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+ MAX_SIZE = MetastoreConf.getSizeVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+ IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+ RECORD_STATS = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+ initSizeEstimator();
+ initCache();
+ LOG.debug("Local cache initialized in HiveMetaStoreClient: " +
mscLocalCache);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf) throws
MetaException {
+ this(conf, null, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader) throws MetaException {
+ this(conf, hookLoader, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ private static void initSizeEstimator() {
+ sizeEstimator = new HashMap<>();
+ IncrementalObjectSizeEstimator.createEstimators(CacheKey.class,
sizeEstimator);
+ Arrays.stream(KeyType.values()).forEach(e -> {
+ IncrementalObjectSizeEstimator.createEstimators(e.keyClass,
sizeEstimator);
+ IncrementalObjectSizeEstimator.createEstimators(e.valueClass,
sizeEstimator);}
+ );
+ }
+
+ /**
+ * KeyType is used to differentiate the request types. More types can be
added in future.
+ */
+ public enum KeyType {
+ PARTITIONS_BY_EXPR(PartitionsByExprRequest.class,
PartitionsByExprResult.class),
+ PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class,
PartitionsSpecByExprResult.class);
+
+ private final Class<?> keyClass;
+ private final Class<?> valueClass;
+
+ KeyType(Class<?> keyClass, Class<?> valueClass) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+ }
+
+ /**
+ * CacheKey objects are used as key for the cache.
+ */
+ public static class CacheKey{
+ KeyType IDENTIFIER;
+ Object obj;
+
+ public CacheKey(KeyType IDENTIFIER, Object obj) {
+ this.IDENTIFIER = IDENTIFIER;
+ this.obj = obj;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return IDENTIFIER == cacheKey.IDENTIFIER &&
+ Objects.equals(obj, cacheKey.obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(IDENTIFIER, obj);
+ }
+ }
+
+ private static int getWeight(CacheKey key, Object val) {
+ ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+ ObjectEstimator valSizeEstimator =
sizeEstimator.get(key.IDENTIFIER.valueClass);
+ int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+ int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize,
valSize, keySize + valSize);
+ }
+ return keySize + valSize;
+ }
+
+ private Object load(CacheKey key) {
+ try {
+ return getResultObject(key);
+ } catch (TException e) {
+ throw new UncheckedCacheException(e);
+ }
+ }
+
+/**
+ * Initializes the cache
+ */
+ private static void initCache() {
+ int initSize = 100;
+ Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+ .initialCapacity(initSize)
+ .maximumWeight(MAX_SIZE)
+ .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+ .removalListener((key, val, cause) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val,
cause);
+ }});
+ if (RECORD_STATS) {
+ cacheBuilder.recordStats();
+ }
+ mscLocalCache = cacheBuilder.build();
+ cacheObjName =
mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+ }
+
+ /**
+ * This method is used to load the cache by calling relevant APIs, depending
on the type of the request.
+ *
+ * @param cacheKey key of the cache, containing an identifier and a request
object
+ * @return Result object / null
+ * @throws TException
+ */
+ private Object getResultObject(CacheKey cacheKey) throws TException {
+ Object result = null;
+
+ switch (cacheKey.IDENTIFIER) {
+ case PARTITIONS_BY_EXPR:
+ result =
super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ case PARTITIONS_SPEC_BY_EXPR:
+ result =
super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ default:
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected PartitionsByExprResult
getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+ try {
+ r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load);
// get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
+ throw new TException(e.getCause());
+ }
+ }
+ } else {
+ r = client.get_partitions_by_expr(req);
+ }
+
+ return r;
+ }
+
+ @Override
+ protected PartitionsSpecByExprResult
getPartitionsSpecByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsSpecByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_SPEC_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, req);
+ try {
+ r = (PartitionsSpecByExprResult) mscLocalCache.get(cacheKey,
this::load); // get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
+ throw new TException(e.getCause());
+ }
+ }
+ } else {
+ r = client.get_partitions_spec_by_expr(req);
Review comment:
nit. whitespace
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+ private static Cache<CacheKey, Object> mscLocalCache = null;
+ //TODO: initialize in the init method
+ private static boolean IS_CACHE_ENABLED;
+ private static long MAX_SIZE;
+ private static boolean RECORD_STATS;
+ private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+ private static String cacheObjName = null;
+
+ public static synchronized void init() {
+ if (mscLocalCache != null) return; // init cache only once
+ Configuration metaConf = MetastoreConf.newMetastoreConf();
+ LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+ MAX_SIZE = MetastoreConf.getSizeVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+ IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+ RECORD_STATS = MetastoreConf.getBoolVar(metaConf,
MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+ initSizeEstimator();
+ initCache();
+ LOG.debug("Local cache initialized in HiveMetaStoreClient: " +
mscLocalCache);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf) throws
MetaException {
+ this(conf, null, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader) throws MetaException {
+ this(conf, hookLoader, true);
+ }
+
+ public HiveMetaStoreClientWithLocalCache(Configuration conf,
HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ private static void initSizeEstimator() {
+ sizeEstimator = new HashMap<>();
+ IncrementalObjectSizeEstimator.createEstimators(CacheKey.class,
sizeEstimator);
+ Arrays.stream(KeyType.values()).forEach(e -> {
+ IncrementalObjectSizeEstimator.createEstimators(e.keyClass,
sizeEstimator);
+ IncrementalObjectSizeEstimator.createEstimators(e.valueClass,
sizeEstimator);}
+ );
+ }
+
+ /**
+ * KeyType is used to differentiate the request types. More types can be
added in future.
+ */
+ public enum KeyType {
+ PARTITIONS_BY_EXPR(PartitionsByExprRequest.class,
PartitionsByExprResult.class),
+ PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class,
PartitionsSpecByExprResult.class);
+
+ private final Class<?> keyClass;
+ private final Class<?> valueClass;
+
+ KeyType(Class<?> keyClass, Class<?> valueClass) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+ }
+
+ /**
+ * CacheKey objects are used as key for the cache.
+ */
+ public static class CacheKey{
+ KeyType IDENTIFIER;
+ Object obj;
+
+ public CacheKey(KeyType IDENTIFIER, Object obj) {
+ this.IDENTIFIER = IDENTIFIER;
+ this.obj = obj;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return IDENTIFIER == cacheKey.IDENTIFIER &&
+ Objects.equals(obj, cacheKey.obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(IDENTIFIER, obj);
+ }
+ }
+
+ private static int getWeight(CacheKey key, Object val) {
+ ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+ ObjectEstimator valSizeEstimator =
sizeEstimator.get(key.IDENTIFIER.valueClass);
+ int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+ int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize,
valSize, keySize + valSize);
+ }
+ return keySize + valSize;
+ }
+
+ private Object load(CacheKey key) {
+ try {
+ return getResultObject(key);
+ } catch (TException e) {
+ throw new UncheckedCacheException(e);
+ }
+ }
+
+/**
+ * Initializes the cache
+ */
+ private static void initCache() {
+ int initSize = 100;
+ Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+ .initialCapacity(initSize)
+ .maximumWeight(MAX_SIZE)
+ .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+ .removalListener((key, val, cause) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val,
cause);
+ }});
+ if (RECORD_STATS) {
+ cacheBuilder.recordStats();
+ }
+ mscLocalCache = cacheBuilder.build();
+ cacheObjName =
mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+ }
+
+ /**
+ * This method is used to load the cache by calling relevant APIs, depending
on the type of the request.
+ *
+ * @param cacheKey key of the cache, containing an identifier and a request
object
+ * @return Result object / null
+ * @throws TException
+ */
+ private Object getResultObject(CacheKey cacheKey) throws TException {
+ Object result = null;
+
+ switch (cacheKey.IDENTIFIER) {
+ case PARTITIONS_BY_EXPR:
+ result =
super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ case PARTITIONS_SPEC_BY_EXPR:
+ result =
super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+ break;
+ default:
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected PartitionsByExprResult
getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_BY_EXPR)) {
+ CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+ try {
+ r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load);
// get either the result or an Exception
+
+ if (LOG.isDebugEnabled() && RECORD_STATS) {
+ LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+ }
+
+ } catch (UncheckedCacheException e) {
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ } else {
+ throw new TException(e.getCause());
+ }
+ }
+ } else {
+ r = client.get_partitions_by_expr(req);
+ }
+
+ return r;
+ }
+
+ @Override
+ protected PartitionsSpecByExprResult
getPartitionsSpecByExprResult(PartitionsByExprRequest req) throws TException {
+ PartitionsSpecByExprResult r;
+
+ // table should be transactional to get responses from the cache
+ if (isCacheEnabledAndInitialized() && isRequestCachable(req,
KeyType.PARTITIONS_SPEC_BY_EXPR)) {
Review comment:
nit. maybe we could create the CacheKey before the `if` clause and pass
it to `isRequestCachable`. Feel free to ignore if you think it does not make
sense.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 464372)
Time Spent: 20m (was: 10m)
> Introduce caching layer in HS2 to accelerate query compilation
> --------------------------------------------------------------
>
> Key: HIVE-23949
> URL: https://issues.apache.org/jira/browse/HIVE-23949
> Project: Hive
> Issue Type: New Feature
> Components: HiveServer2
> Reporter: Soumyakanti Das
> Assignee: Soumyakanti Das
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 20m
> Remaining Estimate: 0h
>
> One of the major contributors to compilation latency is the retrieval of
> metadata from HMS. This JIRA introduces a caching layer in HS2 for this
> metadata. Its design is simple, relying on snapshot information being queried
> to cache and invalidate the metadata. This will help us to reduce the time
> spent in compilation by using HS2 memory more effectively, and it will allow
> us to improve HMS throughput for multi-tenant workloads by reducing the
> number of calls it needs to serve.
> This patch only caches partition retrieval information, which is often one of
> the most costly metadata operations. It also sets the foundation to cache
> additional calls in follow-up work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)