goiri commented on code in PR #5570: URL: https://github.com/apache/hadoop/pull/5570#discussion_r1171881329
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class FederationCache { + + protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; + protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID = + "getPoliciesConfigurations"; + protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID = + "getApplicationHomeSubCluster"; + + public abstract boolean isCachingEnabled(); + + public abstract void initCache(Configuration conf, FederationStateStore stateStore); + + public abstract void clearCache(); + + protected String buildCacheKey(String typeName, String methodName, String argName) { + StringBuilder buffer = new StringBuilder(); + buffer.append(typeName).append(".").append(methodName); + if (argName != null) { + buffer.append("::"); Review Comment: Make a constant. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java: ########## @@ -0,0 +1,263 @@ +package org.apache.hadoop.yarn.server.federation.cache; + Review Comment: Header ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java: ########## @@ -0,0 +1,263 @@ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class FederationJCache extends FederationCache { + + private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); + + private Cache<Object, Object> cache; + + private int cacheTimeToLive; + + private boolean isCachingEnabled = false; + + private FederationStateStore stateStore; + + private String className = getClass().getSimpleName(); + + @Override + public boolean isCachingEnabled() { + return isCachingEnabled; + } + + @Override + public void initCache(Configuration conf, FederationStateStore pStateStore) { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + this.stateStore = pStateStore; + if (cacheTimeToLive > 0) { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); + if (this.cache == null) { + String className = this.getClass().getSimpleName(); + LOG.info("Creating a JCache Manager with name {}.", className); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); + FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> cacheLoaderSingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>()); + CompleteConfiguration<Object, Object> configuration = + new MutableConfiguration<>().setStoreByValue(false) Review Comment: Split lines ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java: ########## @@ -0,0 +1,263 @@ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class FederationJCache extends FederationCache { + + private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); + + private Cache<Object, Object> cache; + + private int cacheTimeToLive; + + private boolean isCachingEnabled = false; + + private FederationStateStore stateStore; + + private String className = getClass().getSimpleName(); + + @Override + public boolean isCachingEnabled() { + return isCachingEnabled; + } + + @Override + public void initCache(Configuration conf, FederationStateStore pStateStore) { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + this.stateStore = pStateStore; + if (cacheTimeToLive > 0) { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); + if (this.cache == null) { + String className = this.getClass().getSimpleName(); + LOG.info("Creating a JCache Manager with name {}.", className); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); + FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> cacheLoaderSingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>()); + CompleteConfiguration<Object, Object> configuration = + new MutableConfiguration<>().setStoreByValue(false) + .setReadThrough(true) + .setExpiryPolicyFactory(expiryPolicySingletonFactory) + .setCacheLoaderFactory(cacheLoaderSingletonFactory); + this.cache = jcacheManager.createCache(className, configuration); + } + isCachingEnabled = true; + return; + } + isCachingEnabled = false; + } + + /** + * Internal class that implements the CacheLoader interface that can be + * plugged into the CacheManager to load objects into the cache for specified + * keys. + */ + private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> { + @SuppressWarnings("unchecked") + @Override + public V load(K key) throws CacheLoaderException { + try { + CacheRequest<K, V> query = (CacheRequest<K, V>) key; + assert query != null; + return query.getValue(); + } catch (Throwable ex) { + throw new CacheLoaderException(ex); + } + } + + @Override + public Map<K, V> loadAll(Iterable<? extends K> keys) + throws CacheLoaderException { + // The FACADE does not use the Cache's getAll API. Hence this is not + // required to be implemented + throw new NotImplementedException("Code is not implemented"); + } + } + + /** + * Internal class that encapsulates the cache key and a function that returns + * the value for the specified key. + */ + private class CacheRequest<K, V> { + private K key; + private Func<K, V> func; + + CacheRequest(K key, Func<K, V> func) { + this.key = key; + this.func = func; + } + + public V getValue() throws Exception { + return func.invoke(key); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((key == null) ? 0 : key.hashCode()); + return result; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + CacheRequest<K, V> other = (CacheRequest<K, V>) obj; + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + + return true; + } + } + + /** + * Encapsulates a method that has one parameter and returns a value of the + * type specified by the TResult parameter. + */ + protected interface Func<T, R> { + R invoke(T input) throws Exception; + } + + @Override + public void clearCache() { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + + jcacheManager.destroyCache(this.getClass().getSimpleName()); + this.cache = null; + } + + @Override + public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean filterInactiveSubClusters) + throws YarnException { + return (Map<SubClusterId, SubClusterInfo>) cache + .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } + + @Override + public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations() + throws YarnException { + return (Map<String, SubClusterPolicyConfiguration>) cache + .get(buildGetPoliciesConfigurationsCacheRequest()); + } + + @Override + public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) { + Object key = buildGetApplicationHomeSubClusterRequest(appId); + return (SubClusterId) cache.get(key); + } + + @Override + public void removeSubCluster(boolean flushCache) { + cache.remove(buildGetSubClustersCacheRequest(flushCache)); + } + + private Object buildGetSubClustersCacheRequest(final boolean filterInactiveSubClusters) { + final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID, + Boolean.toString(filterInactiveSubClusters)); + CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest = + new CacheRequest<>(cacheKey, + key -> { Review Comment: We don't do anything with the key? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class FederationCache { + + protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; + protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID = + "getPoliciesConfigurations"; + protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID = + "getApplicationHomeSubCluster"; + + public abstract boolean isCachingEnabled(); + + public abstract void initCache(Configuration conf, FederationStateStore stateStore); + + public abstract void clearCache(); + + protected String buildCacheKey(String typeName, String methodName, String argName) { Review Comment: Put documentation with examples of input and outputs. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java: ########## @@ -0,0 +1,263 @@ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class FederationJCache extends FederationCache { + + private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); + + private Cache<Object, Object> cache; + + private int cacheTimeToLive; + + private boolean isCachingEnabled = false; + + private FederationStateStore stateStore; + + private String className = getClass().getSimpleName(); + + @Override + public boolean isCachingEnabled() { + return isCachingEnabled; + } + + @Override + public void initCache(Configuration conf, FederationStateStore pStateStore) { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + this.stateStore = pStateStore; + if (cacheTimeToLive > 0) { Review Comment: Reverse the if and return. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java: ########## @@ -0,0 +1,263 @@ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class FederationJCache extends FederationCache { + + private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); + + private Cache<Object, Object> cache; + + private int cacheTimeToLive; + + private boolean isCachingEnabled = false; + + private FederationStateStore stateStore; + + private String className = getClass().getSimpleName(); + + @Override + public boolean isCachingEnabled() { + return isCachingEnabled; + } + + @Override + public void initCache(Configuration conf, FederationStateStore pStateStore) { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + this.stateStore = pStateStore; + if (cacheTimeToLive > 0) { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); + if (this.cache == null) { + String className = this.getClass().getSimpleName(); + LOG.info("Creating a JCache Manager with name {}.", className); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); + FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> cacheLoaderSingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>()); + CompleteConfiguration<Object, Object> configuration = + new MutableConfiguration<>().setStoreByValue(false) + .setReadThrough(true) + .setExpiryPolicyFactory(expiryPolicySingletonFactory) + .setCacheLoaderFactory(cacheLoaderSingletonFactory); + this.cache = jcacheManager.createCache(className, configuration); + } + isCachingEnabled = true; + return; + } + isCachingEnabled = false; + } + + /** + * Internal class that implements the CacheLoader interface that can be + * plugged into the CacheManager to load objects into the cache for specified + * keys. + */ + private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> { + @SuppressWarnings("unchecked") + @Override + public V load(K key) throws CacheLoaderException { + try { + CacheRequest<K, V> query = (CacheRequest<K, V>) key; + assert query != null; + return query.getValue(); + } catch (Throwable ex) { + throw new CacheLoaderException(ex); + } + } + + @Override + public Map<K, V> loadAll(Iterable<? extends K> keys) + throws CacheLoaderException { + // The FACADE does not use the Cache's getAll API. Hence this is not + // required to be implemented + throw new NotImplementedException("Code is not implemented"); + } + } + + /** + * Internal class that encapsulates the cache key and a function that returns + * the value for the specified key. + */ + private class CacheRequest<K, V> { + private K key; + private Func<K, V> func; + + CacheRequest(K key, Func<K, V> func) { + this.key = key; + this.func = func; + } + + public V getValue() throws Exception { + return func.invoke(key); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((key == null) ? 0 : key.hashCode()); + return result; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + CacheRequest<K, V> other = (CacheRequest<K, V>) obj; + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + + return true; + } + } + + /** + * Encapsulates a method that has one parameter and returns a value of the + * type specified by the TResult parameter. + */ + protected interface Func<T, R> { + R invoke(T input) throws Exception; + } + + @Override + public void clearCache() { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + + jcacheManager.destroyCache(this.getClass().getSimpleName()); + this.cache = null; + } + + @Override + public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean filterInactiveSubClusters) + throws YarnException { + return (Map<SubClusterId, SubClusterInfo>) cache + .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } + + @Override + public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations() + throws YarnException { + return (Map<String, SubClusterPolicyConfiguration>) cache + .get(buildGetPoliciesConfigurationsCacheRequest()); + } + + @Override + public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) { + Object key = buildGetApplicationHomeSubClusterRequest(appId); + return (SubClusterId) cache.get(key); + } + + @Override + public void removeSubCluster(boolean flushCache) { + cache.remove(buildGetSubClustersCacheRequest(flushCache)); + } + + private Object buildGetSubClustersCacheRequest(final boolean filterInactiveSubClusters) { + final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID, + Boolean.toString(filterInactiveSubClusters)); + CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest = + new CacheRequest<>(cacheKey, + key -> { + GetSubClustersInfoRequest request = + GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters); + GetSubClustersInfoResponse subClusters = stateStore.getSubClusters(request); + return buildSubClusterInfoMap(subClusters); + }); + return cacheRequest; + } + + private Object buildGetPoliciesConfigurationsCacheRequest() { + final String cacheKey = buildCacheKey(className, + GET_POLICIES_CONFIGURATIONS_CACHEID, null); + CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest = + new CacheRequest<>(cacheKey, + key -> { + GetSubClusterPoliciesConfigurationsRequest request = + GetSubClusterPoliciesConfigurationsRequest.newInstance(); + GetSubClusterPoliciesConfigurationsResponse policyConfigs = + stateStore.getPoliciesConfigurations(request); + return buildPolicyConfigMap(policyConfigs); + }); + return cacheRequest; + } + + private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) { + final String cacheKey = buildCacheKey(getClass().getSimpleName(), + GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString()); + CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>( + cacheKey, + input -> { Review Comment: Is the indentation correct? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java: ########## @@ -0,0 +1,263 @@ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class FederationJCache extends FederationCache { + + private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); + + private Cache<Object, Object> cache; + + private int cacheTimeToLive; + + private boolean isCachingEnabled = false; + + private FederationStateStore stateStore; + + private String className = getClass().getSimpleName(); + + @Override + public boolean isCachingEnabled() { + return isCachingEnabled; + } + + @Override + public void initCache(Configuration conf, FederationStateStore pStateStore) { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + this.stateStore = pStateStore; + if (cacheTimeToLive > 0) { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); + if (this.cache == null) { + String className = this.getClass().getSimpleName(); + LOG.info("Creating a JCache Manager with name {}.", className); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); + FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> cacheLoaderSingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>()); + CompleteConfiguration<Object, Object> configuration = + new MutableConfiguration<>().setStoreByValue(false) + .setReadThrough(true) + .setExpiryPolicyFactory(expiryPolicySingletonFactory) + .setCacheLoaderFactory(cacheLoaderSingletonFactory); + this.cache = jcacheManager.createCache(className, configuration); + } + isCachingEnabled = true; + return; + } + isCachingEnabled = false; + } + + /** + * Internal class that implements the CacheLoader interface that can be + * plugged into the CacheManager to load objects into the cache for specified + * keys. + */ + private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> { + @SuppressWarnings("unchecked") + @Override + public V load(K key) throws CacheLoaderException { + try { + CacheRequest<K, V> query = (CacheRequest<K, V>) key; + assert query != null; + return query.getValue(); + } catch (Throwable ex) { + throw new CacheLoaderException(ex); + } + } + + @Override + public Map<K, V> loadAll(Iterable<? extends K> keys) + throws CacheLoaderException { + // The FACADE does not use the Cache's getAll API. Hence this is not + // required to be implemented + throw new NotImplementedException("Code is not implemented"); + } + } + + /** + * Internal class that encapsulates the cache key and a function that returns + * the value for the specified key. + */ + private class CacheRequest<K, V> { + private K key; + private Func<K, V> func; + + CacheRequest(K key, Func<K, V> func) { + this.key = key; + this.func = func; + } + + public V getValue() throws Exception { + return func.invoke(key); + } + + @Override + public int hashCode() { + final int prime = 31; Review Comment: Can we use the HashBuilder and the EqualsBuilder? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
