Repository: incubator-ranger Updated Branches: refs/heads/master 2e0be82df -> dc87ef986
RANGER-225: Ranger-LookupResource and ValidateConfig implementation for all components in the new pluggable model - Yarn Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/29fc2708 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/29fc2708 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/29fc2708 Branch: refs/heads/master Commit: 29fc2708263d76bffacb0ccfe86f605bcdf0a79b Parents: 03d1dec Author: rmani <[email protected]> Authored: Fri Mar 13 23:15:53 2015 -0700 Committer: rmani <[email protected]> Committed: Fri Mar 13 23:15:53 2015 -0700 ---------------------------------------------------------------------- .../ranger/services/yarn/RangerServiceYarn.java | 73 +++++ .../ranger/services/yarn/client/YarnClient.java | 287 +++++++++++++++++++ .../services/yarn/client/YarnConnectionMgr.java | 43 +++ .../services/yarn/client/YarnResourceMgr.java | 89 ++++++ .../json/model/YarnSchedulerResponse.java | 107 +++++++ 5 files changed, 599 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/29fc2708/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/RangerServiceYarn.java ---------------------------------------------------------------------- diff --git a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/RangerServiceYarn.java b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/RangerServiceYarn.java new file mode 100644 index 0000000..5669c5d --- /dev/null +++ b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/RangerServiceYarn.java @@ -0,0 +1,73 @@ +package org.apache.ranger.services.yarn; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ranger.plugin.model.RangerService; +import org.apache.ranger.plugin.model.RangerServiceDef; +import org.apache.ranger.plugin.service.RangerBaseService; +import org.apache.ranger.plugin.service.ResourceLookupContext; +import org.apache.ranger.services.yarn.client.YarnResourceMgr; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class RangerServiceYarn extends RangerBaseService { + + private static final Log LOG = LogFactory.getLog(RangerServiceYarn.class); + + public RangerServiceYarn() { + super(); + } + + @Override + public void init(RangerServiceDef serviceDef, RangerService service) { + super.init(serviceDef, service); + } + + @Override + public HashMap<String,Object> validateConfig() throws Exception { + HashMap<String, Object> ret = new HashMap<String, Object>(); + String serviceName = getServiceName(); + if(LOG.isDebugEnabled()) { + LOG.debug("==> RangerServiceYarn.validateConfig Service: (" + serviceName + " )"); + } + if ( configs != null) { + try { + ret = YarnResourceMgr.validateConfig(serviceName, configs); + } catch (Exception e) { + LOG.error("<== RangerServiceYarn.validateConfig Error:" + e); + throw e; + } + } + if(LOG.isDebugEnabled()) { + LOG.debug("<== RangerServiceYarn.validateConfig Response : (" + ret + " )"); + } + return ret; + } + + @Override + public List<String> lookupResource(ResourceLookupContext context) throws Exception { + + List<String> ret = new ArrayList<String>(); + String serviceName = getServiceName(); + Map<String,String> configs = getConfigs(); + if(LOG.isDebugEnabled()) { + LOG.debug("==> RangerServiceYarn.lookupResource Context: (" + context + ")"); + } + if (context != null) { + try { + ret = YarnResourceMgr.getYarnResources(serviceName,configs,context); + } catch (Exception e) { + LOG.error( "<==RangerServiceYarn.lookupResource Error : " + e); + throw e; + } + } + if(LOG.isDebugEnabled()) { + LOG.debug("<== RangerServiceYarn.lookupResource Response: (" + ret + ")"); + } + return ret; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/29fc2708/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnClient.java ---------------------------------------------------------------------- diff --git a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnClient.java b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnClient.java new file mode 100644 index 0000000..aff04ed --- /dev/null +++ b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnClient.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.services.yarn.client; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + + +import org.apache.commons.io.FilenameUtils; +import org.apache.log4j.Logger; +import org.apache.ranger.plugin.client.BaseClient; +import org.apache.ranger.plugin.client.HadoopException; +import org.apache.ranger.services.yarn.client.YarnClient; +import org.apache.ranger.services.yarn.client.json.model.YarnSchedulerResponse; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +public class YarnClient { + + public static final Logger LOG = Logger.getLogger(YarnClient.class) ; + + private static final String EXPECTED_MIME_TYPE = "application/json"; + + private static final String YARN_LIST_API_ENDPOINT = "/ws/v1/cluster/scheduler" ; + + private static final String errMessage = " You can still save the repository and start creating " + + "policies, but you would not be able to use autocomplete for " + + "resource names. Check xa_portal.log for more info."; + + + String yarnQUrl; + String userName; + String password; + + public YarnClient(String yarnQueueUrl, String yarnUserName, String yarnPassWord) { + + this.yarnQUrl = yarnQueueUrl; + this.userName = yarnUserName ; + this.password = yarnPassWord; + + if (LOG.isDebugEnabled()) { + LOG.debug("Yarn Client is build with url [" + yarnQueueUrl + "] user: [" + yarnPassWord + "], password: [" + "" + "]"); + } + + } + + public List<String> getQueueList(final String queueNameMatching, final List<String> existingQueueList) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting Yarn queue list for queueNameMatching : " + queueNameMatching); + } + final String errMsg = errMessage; + + List<String> ret = new ArrayList<String>(); + + Callable<List<String>> yarnQueueListGetter = new Callable<List<String>>() { + @Override + public List<String> call() { + + List<String> lret = new ArrayList<String>(); + + String url = yarnQUrl + YARN_LIST_API_ENDPOINT ; + + Client client = null ; + ClientResponse response = null ; + + try { + client = Client.create() ; + + WebResource webResource = client.resource(url); + + response = webResource.accept(EXPECTED_MIME_TYPE) + .get(ClientResponse.class); + + if (LOG.isDebugEnabled()) { + LOG.debug("getQueueList():calling " + url); + } + + if (response != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("getQueueList():response.getStatus()= " + response.getStatus()); + } + if (response.getStatus() == 200) { + String jsonString = response.getEntity(String.class); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + YarnSchedulerResponse yarnQResponse = gson.fromJson(jsonString, YarnSchedulerResponse.class); + if (yarnQResponse != null) { + List<String> yarnQueueList = yarnQResponse.getQueueNames(); + if (yarnQueueList != null) { + for ( String yarnQueueName : yarnQueueList) { + if ( existingQueueList != null && existingQueueList.contains(yarnQueueName)) { + continue; + } + if (queueNameMatching == null || queueNameMatching.isEmpty() + || yarnQueueName.startsWith(queueNameMatching)) { + if (LOG.isDebugEnabled()) { + LOG.debug("getQueueList():Adding yarnQueue " + yarnQueueName); + } + lret.add(yarnQueueName) ; + } + } + } + } + } else{ + LOG.info("getQueueList():response.getStatus()= " + response.getStatus() + " for URL " + url + ", so returning null list"); + String jsonString = response.getEntity(String.class); + LOG.info(jsonString); + lret = null; + } + } else { + String msgDesc = "Unable to get a valid response for " + + "expected mime type : [" + EXPECTED_MIME_TYPE + + "] URL : " + url + " - got null response."; + LOG.error(msgDesc); + HadoopException hdpException = new HadoopException(msgDesc); + hdpException.generateResponseDataMap(false, msgDesc, + msgDesc + errMsg, null, null); + throw hdpException; + } + } catch (HadoopException he) { + throw he; + } catch (Throwable t) { + String msgDesc = "Exception while getting Yarn Queue List." + + " URL : " + url; + HadoopException hdpException = new HadoopException(msgDesc, + t); + LOG.error(msgDesc, t); + + hdpException.generateResponseDataMap(false, + BaseClient.getMessage(t), msgDesc + errMsg, null, + null); + throw hdpException; + + } finally { + if (response != null) { + response.close(); + } + + if (client != null) { + client.destroy(); + } + + } + return lret ; + } + } ; + + try { + ret = timedTask(yarnQueueListGetter, 5, TimeUnit.SECONDS); + } catch ( Exception e) { + LOG.error("Unable to get Yarn Queue list from [" + yarnQUrl + "]", e) ; + } + + return ret; + } + + + + + + public static HashMap<String, Object> testConnection(String serviceName, + Map<String, String> configs) { + + List<String> strList = new ArrayList<String>(); + String errMsg = errMessage; + boolean connectivityStatus = false; + HashMap<String, Object> responseData = new HashMap<String, Object>(); + + YarnClient yarnClient = getYarnClient(serviceName, + configs); + strList = getYarnResource(yarnClient, "",null); + + if (strList != null) { + connectivityStatus = true; + } + + if (connectivityStatus) { + String successMsg = "TestConnection Successful"; + BaseClient.generateResponseDataMap(connectivityStatus, successMsg, + successMsg, null, null, responseData); + } else { + String failureMsg = "Unable to retrieve any Yarn Queues using given parameters."; + BaseClient.generateResponseDataMap(connectivityStatus, failureMsg, + failureMsg + errMsg, null, null, responseData); + } + + return responseData; + } + + public static YarnClient getYarnClient(String serviceName, + Map<String, String> configs) { + YarnClient yarnClient = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Getting YarnClient for datasource: " + serviceName + + "configMap: " + configs); + } + String errMsg = errMessage; + if (configs == null || configs.isEmpty()) { + String msgDesc = "Could not connect as Connection ConfigMap is empty."; + LOG.error(msgDesc); + HadoopException hdpException = new HadoopException(msgDesc); + hdpException.generateResponseDataMap(false, msgDesc, msgDesc + + errMsg, null, null); + throw hdpException; + } else { + String yarnUrl = configs.get("yarn.url"); + String yarnUserName = configs.get("username"); + String yarnPassWord = configs.get("password"); + yarnClient = new YarnClient (yarnUrl, yarnUserName, + yarnPassWord); + + } + return yarnClient; + } + + public static List<String> getYarnResource (final YarnClient yarnClient, + String yanrQname, List<String> existingQueueName) { + + List<String> resultList = new ArrayList<String>(); + String errMsg = errMessage; + + try { + if (yarnClient == null) { + String msgDesc = "Unable to get Yarn Queue : YarnClient is null."; + LOG.error(msgDesc); + HadoopException hdpException = new HadoopException(msgDesc); + hdpException.generateResponseDataMap(false, msgDesc, msgDesc + + errMsg, null, null); + throw hdpException; + } + + if (yanrQname != null) { + String finalyarnQueueName = (yanrQname == null) ? "" + : yanrQname.trim(); + resultList = yarnClient + .getQueueList(finalyarnQueueName,existingQueueName); + if (resultList != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Returning list of " + resultList.size() + " Yarn Queues"); + } + } + } + } catch (HadoopException he) { + throw he; + } catch (Exception e) { + String msgDesc = "getYarnResource: Unable to get Yarn resources."; + LOG.error(msgDesc, e); + HadoopException hdpException = new HadoopException(msgDesc); + + hdpException.generateResponseDataMap(false, + BaseClient.getMessage(e), msgDesc + errMsg, null, null); + throw hdpException; + } + return resultList; + } + + public static <T> T timedTask(Callable<T> callableObj, long timeout, + TimeUnit timeUnit) throws Exception { + return callableObj.call(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/29fc2708/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnConnectionMgr.java ---------------------------------------------------------------------- diff --git a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnConnectionMgr.java b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnConnectionMgr.java new file mode 100644 index 0000000..e2cc2ef --- /dev/null +++ b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnConnectionMgr.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.services.yarn.client; + +import org.apache.log4j.Logger; + + +public class YarnConnectionMgr { + + public static final Logger LOG = Logger.getLogger(YarnConnectionMgr.class); + + public static YarnClient getYarnClient(final String yarnURL, String userName, String password) { + YarnClient yarnClient = null; + if (yarnURL == null || yarnURL.isEmpty()) { + LOG.error("Can not create YarnClient: yarnURL is empty"); + } else if (userName == null || userName.isEmpty()) { + LOG.error("Can not create YarnClient: YarnuserName is empty"); + } else if (password == null || password.isEmpty()) { + LOG.error("Can not create YarnClient: YarnPassWord is empty"); + } else { + yarnClient = new YarnClient(yarnURL, userName, password); + } + return yarnClient; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/29fc2708/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java ---------------------------------------------------------------------- diff --git a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java new file mode 100644 index 0000000..35d95e6 --- /dev/null +++ b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/YarnResourceMgr.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.services.yarn.client; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.ranger.plugin.service.ResourceLookupContext; + +public class YarnResourceMgr { + public static final Logger LOG = Logger.getLogger(YarnResourceMgr.class); + private static final String YARNQUEUE = "queue"; + + public static HashMap<String, Object> validateConfig(String serviceName, Map<String, String> configs) throws Exception { + HashMap<String, Object> ret = null; + + if(LOG.isDebugEnabled()) { + LOG.debug("==> YarnResourceMgr.validateConfig ServiceName: "+ serviceName + "Configs" + configs ) ; + } + + try { + ret = YarnClient.testConnection(serviceName, configs); + } catch (Exception e) { + LOG.error("<== YarnResourceMgr.validateConfig Error: " + e) ; + throw e; + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== YarnResourceMgr.validateConfig Result : "+ ret ) ; + } + return ret; + } + + public static List<String> getYarnResources(String serviceName, Map<String, String> configs,ResourceLookupContext context) { + String userInput = context.getUserInput(); + String resource = context.getResourceName(); + Map<String, List<String>> resourceMap = context.getResources(); + List<String> resultList = null; + List<String> yarnQueueList = null; + String yarnQueueName = null; + + if ( resourceMap != null && !resourceMap.isEmpty() && + resourceMap.get(YARNQUEUE) != null ) { + yarnQueueName = userInput; + yarnQueueList = resourceMap.get(YARNQUEUE); + } else { + yarnQueueName = userInput; + } + + + if (configs == null || configs.isEmpty()) { + LOG.error("Connection Config is empty"); + + } else { + + String url = configs.get("yarn.url"); + String username = configs.get("username"); + String password = configs.get("password"); + resultList = getYarnResource(url, username, password,yarnQueueName,yarnQueueList) ; + } + return resultList ; + } + + public static List<String> getYarnResource(String url, String username, String password,String yarnQueueName, List<String> yarnQueueList) { + final YarnClient yarnClient = YarnConnectionMgr.getYarnClient(url, username, password); + List<String> topologyList = yarnClient.getQueueList(yarnQueueName, yarnQueueList); + return topologyList; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/29fc2708/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/json/model/YarnSchedulerResponse.java ---------------------------------------------------------------------- diff --git a/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/json/model/YarnSchedulerResponse.java b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/json/model/YarnSchedulerResponse.java new file mode 100644 index 0000000..c2252d6 --- /dev/null +++ b/plugin-yarn/src/main/java/org/apache/ranger/services/yarn/client/json/model/YarnSchedulerResponse.java @@ -0,0 +1,107 @@ +package org.apache.ranger.services.yarn.client.json.model; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + + + +@JsonAutoDetect(getterVisibility=Visibility.NONE, setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL ) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class YarnSchedulerResponse implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private YarnScheduler scheduler = null; + + public YarnScheduler getScheduler() { return scheduler; } + + public List<String> getQueueNames() { + List<String> ret = new ArrayList<String>(); + + if(scheduler != null) { + scheduler.collectQueueNames(ret); + } + + return ret; + } + + + @JsonAutoDetect(getterVisibility=Visibility.NONE, setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY) + @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL ) + @JsonIgnoreProperties(ignoreUnknown=true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class YarnScheduler implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private YarnSchedulerInfo schedulerInfo = null; + + public YarnSchedulerInfo getSchedulerInfo() { return schedulerInfo; } + + public void collectQueueNames(List<String> queueNames) { + if(schedulerInfo != null) { + schedulerInfo.collectQueueNames(queueNames, null); + } + } + } + + @JsonAutoDetect(getterVisibility=Visibility.NONE, setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY) + @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL ) + @JsonIgnoreProperties(ignoreUnknown=true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class YarnSchedulerInfo implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private String queueName = null; + private YarnQueues queues = null; + + public String getQueueName() { return queueName; } + + public YarnQueues getQueues() { return queues; } + + public void collectQueueNames(List<String> queueNames, String parentQueueName) { + if(queueName != null) { + String queueFqdn = parentQueueName == null ? queueName : parentQueueName + "." + queueName; + + queueNames.add(queueFqdn); + + if(queues != null) { + queues.collectQueueNames(queueNames, queueFqdn); + } + } + } + } + + @JsonAutoDetect(getterVisibility=Visibility.NONE, setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY) + @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL ) + @JsonIgnoreProperties(ignoreUnknown=true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class YarnQueues implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private List<YarnSchedulerInfo> queue = null; + + public List<YarnSchedulerInfo> getQueue() { return queue; } + + public void collectQueueNames(List<String> queueNames, String parentQueueName) { + if(queue != null) { + for(YarnSchedulerInfo schedulerInfo : queue) { + schedulerInfo.collectQueueNames(queueNames, parentQueueName); + } + } + } + } +}
