slfan1989 commented on code in PR #5570:
URL: https://github.com/apache/hadoop/pull/5570#discussion_r1171996080


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {
+      CachingProvider jcacheProvider = Caching.getCachingProvider();
+      CacheManager jcacheManager = jcacheProvider.getCacheManager();
+      this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+      if (this.cache == null) {
+        String className = this.getClass().getSimpleName();
+        LOG.info("Creating a JCache Manager with name {}.", className);
+        Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+        FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
+        FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> 
cacheLoaderSingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+        CompleteConfiguration<Object, Object> configuration =
+            new MutableConfiguration<>().setStoreByValue(false)
+            .setReadThrough(true)
+            .setExpiryPolicyFactory(expiryPolicySingletonFactory)
+            .setCacheLoaderFactory(cacheLoaderSingletonFactory);
+        this.cache = jcacheManager.createCache(className, configuration);
+      }
+      isCachingEnabled = true;
+      return;
+    }
+    isCachingEnabled = false;
+  }
+
+  /**
+   * Internal class that implements the CacheLoader interface that can be
+   * plugged into the CacheManager to load objects into the cache for specified
+   * keys.
+   */
+  private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public V load(K key) throws CacheLoaderException {
+      try {
+        CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+        assert query != null;
+        return query.getValue();
+      } catch (Throwable ex) {
+        throw new CacheLoaderException(ex);
+      }
+    }
+
+    @Override
+    public Map<K, V> loadAll(Iterable<? extends K> keys)
+        throws CacheLoaderException {
+      // The FACADE does not use the Cache's getAll API. Hence this is not
+      // required to be implemented
+      throw new NotImplementedException("Code is not implemented");
+    }
+  }
+
+  /**
+   * Internal class that encapsulates the cache key and a function that returns
+   * the value for the specified key.
+   */
+  private class CacheRequest<K, V> {
+    private K key;
+    private Func<K, V> func;
+
+    CacheRequest(K key, Func<K, V> func) {
+      this.key = key;
+      this.func = func;
+    }
+
+    public V getValue() throws Exception {
+      return func.invoke(key);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((key == null) ? 0 : key.hashCode());
+      return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
+      if (key == null) {
+        if (other.key != null) {
+          return false;
+        }
+      } else if (!key.equals(other.key)) {
+        return false;
+      }
+
+      return true;
+    }
+  }
+
+  /**
+   * Encapsulates a method that has one parameter and returns a value of the
+   * type specified by the TResult parameter.
+   */
+  protected interface Func<T, R> {
+    R invoke(T input) throws Exception;
+  }
+
+  @Override
+  public void clearCache() {
+    CachingProvider jcacheProvider = Caching.getCachingProvider();
+    CacheManager jcacheManager = jcacheProvider.getCacheManager();
+
+    jcacheManager.destroyCache(this.getClass().getSimpleName());
+    this.cache = null;
+  }
+
+  @Override
+  public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean 
filterInactiveSubClusters)
+      throws YarnException {
+    return (Map<SubClusterId, SubClusterInfo>) cache
+        .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+  }
+
+  @Override
+  public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
+      throws YarnException {
+    return (Map<String, SubClusterPolicyConfiguration>) cache
+        .get(buildGetPoliciesConfigurationsCacheRequest());
+  }
+
+  @Override
+  public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) {
+    Object key = buildGetApplicationHomeSubClusterRequest(appId);
+    return (SubClusterId) cache.get(key);
+  }
+
+  @Override
+  public void removeSubCluster(boolean flushCache) {
+    cache.remove(buildGetSubClustersCacheRequest(flushCache));
+  }
+
+  private Object buildGetSubClustersCacheRequest(final boolean 
filterInactiveSubClusters) {
+    final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
+       Boolean.toString(filterInactiveSubClusters));
+    CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
+        new CacheRequest<>(cacheKey,
+        key -> {
+          GetSubClustersInfoRequest request =
+              GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters);
+          GetSubClustersInfoResponse subClusters = 
stateStore.getSubClusters(request);
+          return buildSubClusterInfoMap(subClusters);
+        });
+    return cacheRequest;
+  }
+
+  private Object buildGetPoliciesConfigurationsCacheRequest() {
+    final String cacheKey = buildCacheKey(className,
+       GET_POLICIES_CONFIGURATIONS_CACHEID, null);
+    CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> 
cacheRequest =
+        new CacheRequest<>(cacheKey,
+        key -> {
+          GetSubClusterPoliciesConfigurationsRequest request =
+              GetSubClusterPoliciesConfigurationsRequest.newInstance();
+          GetSubClusterPoliciesConfigurationsResponse policyConfigs =
+              stateStore.getPoliciesConfigurations(request);
+          return buildPolicyConfigMap(policyConfigs);
+        });
+    return cacheRequest;
+  }
+
+  private Object buildGetApplicationHomeSubClusterRequest(ApplicationId 
applicationId) {
+    final String cacheKey = buildCacheKey(getClass().getSimpleName(),
+       GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
+    CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
+        cacheKey,
+            input -> {

Review Comment:
   I will modify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to