Repository: incubator-ranger Updated Branches: refs/heads/master f9fb61102 -> e3f0f41d7
RANGER-476:ServiceName should be used in Lookup Connection cache in Connection Manager instead of ServiceType as we can have multiple Services for same service type Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/e3f0f41d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/e3f0f41d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/e3f0f41d Branch: refs/heads/master Commit: e3f0f41d7d8b4637dce50680c463578f1eacb9a9 Parents: f9fb611 Author: rmani <[email protected]> Authored: Thu May 14 15:51:06 2015 -0700 Committer: rmani <[email protected]> Committed: Thu May 14 15:51:06 2015 -0700 ---------------------------------------------------------------------- .../hbase/client/HBaseConnectionMgr.java | 25 +++++++++++--------- .../services/hbase/client/HBaseResourceMgr.java | 6 +++-- .../services/hdfs/client/HdfsConnectionMgr.java | 25 +++++++++++--------- .../services/hdfs/client/HdfsResourceMgr.java | 7 ++++-- .../services/knox/client/KnoxResourceMgr.java | 8 +++---- .../services/kms/client/KMSResourceMgr.java | 9 ++++--- .../services/yarn/client/YarnResourceMgr.java | 4 +++- .../services/storm/client/StormResourceMgr.java | 6 +++-- 8 files changed, 54 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseConnectionMgr.java ---------------------------------------------------------------------- diff --git a/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseConnectionMgr.java b/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseConnectionMgr.java index 5c1c73b..a69c907 100644 --- a/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseConnectionMgr.java +++ b/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseConnectionMgr.java @@ -19,10 +19,11 @@ package org.apache.ranger.services.hbase.client; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -34,13 +35,13 @@ public class HBaseConnectionMgr { private static Logger LOG = Logger.getLogger(HBaseConnectionMgr.class); - protected HashMap<String, HBaseClient> hbaseConnectionCache; + protected ConcurrentMap<String, HBaseClient> hbaseConnectionCache; - protected HashMap<String, Boolean> repoConnectStatusMap; + protected ConcurrentMap<String, Boolean> repoConnectStatusMap; public HBaseConnectionMgr() { - hbaseConnectionCache = new HashMap<String, HBaseClient>(); - repoConnectStatusMap = new HashMap<String, Boolean>(); + hbaseConnectionCache = new ConcurrentHashMap<String, HBaseClient>(); + repoConnectStatusMap = new ConcurrentHashMap<String, Boolean>(); } public HBaseClient getHBaseConnection(final String serviceName, final String serviceType, final Map<String,String> configs) { @@ -48,8 +49,7 @@ public class HBaseConnectionMgr { HBaseClient client = null; if (serviceType != null) { // get it from the cache - synchronized (hbaseConnectionCache) { - client = hbaseConnectionCache.get(serviceType); + client = hbaseConnectionCache.get(serviceName); if (client == null) { if ( configs == null ) { final Callable<HBaseClient> connectHBase = new Callable<HBaseClient>() { @@ -102,7 +102,11 @@ public class HBaseConnectionMgr { } if(client!=null){ - hbaseConnectionCache.put(serviceType, client); + HBaseClient oldClient = hbaseConnectionCache.putIfAbsent(serviceName, client); + if (oldClient != null) { + // in the meantime someone else has put a valid client into the cache, let's use that instead. + client = oldClient; + } } } else { @@ -110,12 +114,11 @@ public class HBaseConnectionMgr { List<String> testConnect = client.getTableList(".\\*",null); if(testConnect == null){ - hbaseConnectionCache.remove(serviceType); + hbaseConnectionCache.remove(serviceName); client = getHBaseConnection(serviceName,serviceType,configs); } } - repoConnectStatusMap.put(serviceType, true); - } + repoConnectStatusMap.put(serviceName, true); } else { LOG.error("Service Name not found with name " + serviceName, new Throwable()); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseResourceMgr.java ---------------------------------------------------------------------- diff --git a/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseResourceMgr.java b/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseResourceMgr.java index 4ce6a8d..1e4f6d0 100644 --- a/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseResourceMgr.java +++ b/hbase-agent/src/main/java/org/apache/ranger/services/hbase/client/HBaseResourceMgr.java @@ -136,8 +136,10 @@ public class HBaseResourceMgr { } } if (callableObj != null) { - resultList = TimedEventUtil.timedTask(callableObj, 5, - TimeUnit.SECONDS); + synchronized(hBaseClient) { + resultList = TimedEventUtil.timedTask(callableObj, 5, + TimeUnit.SECONDS); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsConnectionMgr.java ---------------------------------------------------------------------- diff --git a/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsConnectionMgr.java b/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsConnectionMgr.java index d62bb9c..400a379 100644 --- a/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsConnectionMgr.java +++ b/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsConnectionMgr.java @@ -19,10 +19,11 @@ package org.apache.ranger.services.hdfs.client; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -32,14 +33,14 @@ import org.apache.ranger.services.hdfs.client.HdfsClient; public class HdfsConnectionMgr { - protected Map<String, HdfsClient> hdfdsConnectionCache = null; - protected Map<String, Boolean> repoConnectStatusMap = null; + protected ConcurrentMap<String, HdfsClient> hdfsConnectionCache = null; + protected ConcurrentMap<String, Boolean> repoConnectStatusMap = null; private static Logger LOG = Logger.getLogger(HdfsConnectionMgr.class); public HdfsConnectionMgr(){ - hdfdsConnectionCache = new HashMap<String, HdfsClient>(); - repoConnectStatusMap = new HashMap<String, Boolean>(); + hdfsConnectionCache = new ConcurrentHashMap<String, HdfsClient>(); + repoConnectStatusMap = new ConcurrentHashMap<String, Boolean>(); } @@ -47,8 +48,7 @@ public class HdfsConnectionMgr { HdfsClient hdfsClient = null; if (serviceType != null) { // get it from the cache - synchronized (hdfdsConnectionCache) { - hdfsClient = hdfdsConnectionCache.get(serviceType); + hdfsClient = hdfsConnectionCache.get(serviceName); if (hdfsClient == null) { if(configs == null) { final Callable<HdfsClient> connectHDFS = new Callable<HdfsClient>() { @@ -81,16 +81,19 @@ public class HdfsConnectionMgr { + serviceName + " using configuration : " + configs, e); } } - hdfdsConnectionCache.put(serviceType, hdfsClient); - repoConnectStatusMap.put(serviceType, true); + HdfsClient oldClient = hdfsConnectionCache.putIfAbsent(serviceName, hdfsClient); + if (oldClient != null) { + // in the meantime someone else has put a valid client into the cache, let's use that instead. + hdfsClient = oldClient; + } + repoConnectStatusMap.put(serviceName, true); } else { List<String> testConnect = hdfsClient.listFiles("/", "*",null); if(testConnect == null){ - hdfdsConnectionCache.put(serviceType, hdfsClient); + hdfsConnectionCache.put(serviceName, hdfsClient); hdfsClient = getHadoopConnection(serviceName,serviceType,configs); } } - } } else { LOG.error("Serice not found with name "+serviceName, new Throwable()); } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsResourceMgr.java ---------------------------------------------------------------------- diff --git a/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsResourceMgr.java b/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsResourceMgr.java index 9161a5a..73ce00d 100644 --- a/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsResourceMgr.java +++ b/hdfs-agent/src/main/java/org/apache/ranger/services/hdfs/client/HdfsResourceMgr.java @@ -105,8 +105,11 @@ public class HdfsResourceMgr { } }; - - resultList = TimedEventUtil.timedTask(callableObj, 5,TimeUnit.SECONDS); + if ( callableObj != null) { + synchronized(hdfsClient) { + resultList = TimedEventUtil.timedTask(callableObj, 5,TimeUnit.SECONDS); + } + } if(LOG.isDebugEnabled()) { LOG.debug("Resource dir : " + userInput + " wild card to match : " + wildCardToMatch http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/knox-agent/src/main/java/org/apache/ranger/services/knox/client/KnoxResourceMgr.java ---------------------------------------------------------------------- diff --git a/knox-agent/src/main/java/org/apache/ranger/services/knox/client/KnoxResourceMgr.java b/knox-agent/src/main/java/org/apache/ranger/services/knox/client/KnoxResourceMgr.java index e0206e7..cf551b9 100644 --- a/knox-agent/src/main/java/org/apache/ranger/services/knox/client/KnoxResourceMgr.java +++ b/knox-agent/src/main/java/org/apache/ranger/services/knox/client/KnoxResourceMgr.java @@ -101,10 +101,10 @@ public class KnoxResourceMgr { } final KnoxClient knoxClient = new KnoxConnectionMgr().getKnoxClient(knoxUrl, knoxAdminUser, knoxAdminPassword); - resultList = KnoxClient.getKnoxResources(knoxClient, knoxTopologyName, knoxServiceName,knoxTopologyList,knoxServiceList); - + + synchronized(knoxClient) { + resultList = KnoxClient.getKnoxResources(knoxClient, knoxTopologyName, knoxServiceName,knoxTopologyList,knoxServiceList); + } return resultList; } - - } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/plugin-kms/src/main/java/org/apache/ranger/services/kms/client/KMSResourceMgr.java ---------------------------------------------------------------------- diff --git a/plugin-kms/src/main/java/org/apache/ranger/services/kms/client/KMSResourceMgr.java b/plugin-kms/src/main/java/org/apache/ranger/services/kms/client/KMSResourceMgr.java index 94ca822..007b97b 100755 --- a/plugin-kms/src/main/java/org/apache/ranger/services/kms/client/KMSResourceMgr.java +++ b/plugin-kms/src/main/java/org/apache/ranger/services/kms/client/KMSResourceMgr.java @@ -54,8 +54,8 @@ public class KMSResourceMgr { String userInput = context.getUserInput(); Map<String, List<String>> resourceMap = context.getResources(); List<String> resultList = null; - List<String> kmsKeyList = null; - String kmsKeyName = null; + List<String> kmsKeyList = null; + String kmsKeyName = null; if ( resourceMap != null && !resourceMap.isEmpty() && resourceMap.get(KMSKEY) != null ) { kmsKeyName = userInput; @@ -78,8 +78,11 @@ public class KMSResourceMgr { } public static List<String> getKMSResource(String url, String username, String password,String kmsKeyName, List<String> kmsKeyList) { + List<String> topologyList = null; final KMSClient KMSClient = KMSConnectionMgr.getKMSClient(url, username, password); - List<String> topologyList = KMSClient.getKeyList(kmsKeyName, kmsKeyList); + synchronized(KMSClient){ + topologyList = KMSClient.getKeyList(kmsKeyName, kmsKeyList); + } return topologyList; } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java ---------------------------------------------------------------------- diff --git a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java index 70a6dfb..95d29c0 100644 --- a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java +++ b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java @@ -84,7 +84,9 @@ public class YarnResourceMgr { final YarnClient yarnClient = YarnConnectionMgr.getYarnClient(url, username, password); List<String> topologyList = null; if (yarnClient != null) { - topologyList = yarnClient.getQueueList(yarnQueueName, yarnQueueList); + synchronized(yarnClient) { + topologyList = yarnClient.getQueueList(yarnQueueName, yarnQueueList); + } } return topologyList; } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e3f0f41d/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java ---------------------------------------------------------------------- diff --git a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java index c572898..52c7401 100644 --- a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java +++ b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java @@ -72,7 +72,6 @@ public class StormResourceMgr { LOG.error("Connection Config is empty"); } else { - String url = configs.get("nimbus.url"); String username = configs.get("username"); String password = configs.get("password"); @@ -82,12 +81,15 @@ public class StormResourceMgr { } public static List<String> getStormResources(String url, String username, String password,String topologyName, List<String> StormTopologyList) { + List<String> topologyList = null; final StormClient stormClient = StormConnectionMgr.getStormClient(url, username, password); if (stormClient == null) { LOG.error("Storm Client is null"); return new ArrayList<String>(); } - List<String> topologyList = stormClient.getTopologyList(topologyName,StormTopologyList) ; + synchronized(stormClient){ + topologyList = stormClient.getTopologyList(topologyName,StormTopologyList) ; + } return topologyList; }
