Repository: incubator-ranger
Updated Branches:
  refs/heads/stack 432b4b11d -> a67360d26


Ranger-225:Ranger-LookupResource and ValidateConfig implementation for all 
components in the new pluggable model - Storm


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a67360d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a67360d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a67360d2

Branch: refs/heads/stack
Commit: a67360d26a350dcf2d59fa09ae48f42ca3ca3f95
Parents: 432b4b1
Author: rmani <[email protected]>
Authored: Thu Feb 12 19:39:08 2015 -0800
Committer: rmani <[email protected]>
Committed: Thu Feb 12 19:39:08 2015 -0800

----------------------------------------------------------------------
 .../services/storm/RangerServiceStorm.java      |  98 +++++
 .../services/storm/client/StormClient.java      | 392 +++++++++++++++++++
 .../storm/client/StormConnectionMgr.java        |  43 ++
 .../services/storm/client/StormResourceMgr.java |  70 ++++
 .../storm/client/json/model/Topology.java       |  47 +++
 .../client/json/model/TopologyListResponse.java |  38 ++
 6 files changed, 688 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a67360d2/storm-agent/src/main/java/org/apache/ranger/services/storm/RangerServiceStorm.java
----------------------------------------------------------------------
diff --git 
a/storm-agent/src/main/java/org/apache/ranger/services/storm/RangerServiceStorm.java
 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/RangerServiceStorm.java
new file mode 100644
index 0000000..2ff0185
--- /dev/null
+++ 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/RangerServiceStorm.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.services.storm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ranger.plugin.model.RangerService;
+import org.apache.ranger.plugin.model.RangerServiceDef;
+import org.apache.ranger.plugin.service.RangerBaseService;
+import org.apache.ranger.plugin.service.ResourceLookupContext;
+import org.apache.ranger.services.storm.client.StormResourceMgr;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class RangerServiceStorm extends RangerBaseService {
+
+       private static final Log LOG = 
LogFactory.getLog(RangerServiceStorm.class);
+       
+       Map<String, String> configs;
+       String                      service;
+       
+       public RangerServiceStorm() {
+               super();
+       }
+       
+       @Override
+       public void init(RangerServiceDef serviceDef, RangerService service) {
+               super.init(serviceDef, service);
+               init();
+       }
+
+       @Override
+       public HashMap<String,Object> validateConfig() throws Exception {
+               HashMap<String, Object> ret = new HashMap<String, Object>();
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerServiceStorm.validateConfig 
Service: (" + service + " )");
+               }
+               if ( configs != null) {
+                       try  {
+                               ret = StormResourceMgr.validateConfig(service, 
configs);
+                       } catch (Exception e) {
+                               LOG.error("<== RangerServiceKnox.validateConfig 
Error:" + e);
+                               throw e;
+                       }
+               }
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerServiceKnox.validateConfig 
Response : (" + ret + " )");
+               }
+               return ret;
+       }
+
+       @Override
+       public List<String> lookupResource(ResourceLookupContext context) 
throws Exception {
+               
+               List<String> ret = new ArrayList<String>();
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerServiceKnox.lookupResource 
Context: (" + context + ")");
+               }
+               if (context != null) {
+                       try {
+                               ret  = 
StormResourceMgr.getStormResources(service,configs,context);
+                                               
+                       } catch (Exception e) {
+                         LOG.error( "<==RangerServiceKnox.lookupResource Error 
: " + e);
+                         throw e;
+                       }
+               }
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerServiceKnox.lookupResource 
Response: (" + ret + ")");
+               }
+               return ret;
+       }
+       
+       public void init() {
+               service  = getService().getName();
+               configs  = getService().getConfigs();
+       }
+       
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a67360d2/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
----------------------------------------------------------------------
diff --git 
a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
new file mode 100644
index 0000000..88303e7
--- /dev/null
+++ 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.services.storm.client;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.security.KrbPasswordSaverLoginModule;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.log4j.Logger;
+import org.apache.ranger.plugin.client.BaseClient;
+import org.apache.ranger.plugin.client.HadoopException;
+import org.apache.ranger.services.storm.client.json.model.Topology;
+import org.apache.ranger.services.storm.client.json.model.TopologyListResponse;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class StormClient {
+       
+       public static final Logger LOG = Logger.getLogger(StormClient.class) ;
+
+       private static final String EXPECTED_MIME_TYPE = "application/json";
+       
+       private static final String TOPOLOGY_LIST_API_ENDPOINT = 
"/api/v1/topology/summary" ;
+       
+       private static final String errMessage =  " You can still save the 
repository and start creating "
+                                                                               
          + "policies, but you would not be able to use autocomplete for "
+                                                                               
          + "resource names. Check xa_portal.log for more info.";
+
+       String stormUIUrl;
+       String userName;
+       String password;
+
+       public StormClient(String aStormUIUrl, String aUserName, String 
aPassword) {
+               
+               this.stormUIUrl = aStormUIUrl;
+               this.userName = aUserName ;
+               this.password = aPassword;
+               
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Storm Client is build with url [" + 
aStormUIUrl + "] user: [" + aUserName + "], password: [" + "" + "]");
+               }
+
+       }
+
+       public List<String> getTopologyList(final String topologyNameMatching, 
final List<String> stormTopologyList) {
+               
+               LOG.debug("Getting Storm topology list for topologyNameMatching 
: " +
+                               topologyNameMatching);
+               final String errMsg = errMessage;
+               
+               List<String> ret = new ArrayList<String>();
+               
+               PrivilegedAction<ArrayList<String>> topologyListGetter = new 
PrivilegedAction<ArrayList<String>>() {
+                       @Override
+                       public ArrayList<String> run() {
+                               
+                               ArrayList<String> lret = new 
ArrayList<String>();
+                               
+                               String url = stormUIUrl + 
TOPOLOGY_LIST_API_ENDPOINT ;
+                               
+                               Client client = null ;
+                               ClientResponse response = null ;
+                               
+                               try {
+                                       client = Client.create() ;
+                                       
+                                       WebResource webResource = 
client.resource(url);
+                                       
+                                       response = 
webResource.accept(EXPECTED_MIME_TYPE)
+                                                   .get(ClientResponse.class);
+                                       
+                                       LOG.debug("getTopologyList():calling " 
+ url);
+                                       
+                                       if (response != null) {
+                                               
LOG.debug("getTopologyList():response.getStatus()= " + response.getStatus());   
+                                               if (response.getStatus() == 
200) {
+                                                       String jsonString = 
response.getEntity(String.class);
+                                                       Gson gson = new 
GsonBuilder().setPrettyPrinting().create();
+                                                       TopologyListResponse 
topologyListResponse = gson.fromJson(jsonString, TopologyListResponse.class);
+                                                       if 
(topologyListResponse != null) {
+                                                               if 
(topologyListResponse.getTopologyList() != null) {
+                                                                       
for(Topology topology : topologyListResponse.getTopologyList()) {
+                                                                               
String topologyName = topology.getName() ;
+                                                                               
if ( stormTopologyList != null && stormTopologyList.contains(topologyName)) {
+                                                                               
continue;
+                                                                       }
+                                                                               
LOG.debug("getTopologyList():Found topology " + topologyName);
+                                                                               
LOG.debug("getTopologyList():topology Name=[" + topology.getName() + "], 
topologyNameMatching=[" + topologyNameMatching + "], 
existingStormTopologyList=[" + stormTopologyList + "]");
+                                                                               
if (topologyName != null) {
+                                                                               
        if (topologyNameMatching == null || topologyNameMatching.isEmpty() || 
FilenameUtils.wildcardMatch(topology.getName(), topologyNameMatching + "*")) {
+                                                                               
                LOG.debug("getTopologyList():Adding topology " + topologyName);
+                                                                               
                lret.add(topologyName) ;
+                                                                               
        }
+                                                                               
}
+                                                                       }
+                                                               }
+                                                       }
+                                               } else{
+                                                       
LOG.info("getTopologyList():response.getStatus()= " + response.getStatus() + " 
for URL " + url + ", so returning null list");   
+                                                       String jsonString = 
response.getEntity(String.class);
+                                                       LOG.info(jsonString);
+                                                       lret = null;
+                                               }
+                                       } else {
+                                               String msgDesc = "Unable to get 
a valid response for "
+                                                               + "expected 
mime type : [" + EXPECTED_MIME_TYPE
+                                                               + "] URL : " + 
url + " - got null response.";
+                                               LOG.error(msgDesc);
+                                               HadoopException hdpException = 
new HadoopException(msgDesc);
+                                               
hdpException.generateResponseDataMap(false, msgDesc,
+                                                               msgDesc + 
errMsg, null, null);
+                                               throw hdpException;
+                                       }
+                               } catch (HadoopException he) {
+                                       throw he;
+                               } catch (Throwable t) {
+                                       String msgDesc = "Exception while 
getting Storm TopologyList."
+                                                       + " URL : " + url;
+                                       HadoopException hdpException = new 
HadoopException(msgDesc,
+                                                       t);
+                                       LOG.error(msgDesc, t);
+
+                                       
hdpException.generateResponseDataMap(false,
+                                                       
BaseClient.getMessage(t), msgDesc + errMsg, null,
+                                                       null);
+                                       throw hdpException;
+                                       
+                               } finally {
+                                       if (response != null) {
+                                               response.close();
+                                       }
+                                       
+                                       if (client != null) {
+                                               client.destroy(); 
+                                       }
+                               
+                               }
+                               return lret ;
+                       }
+               } ;
+               
+               try {
+                       ret = executeUnderKerberos(this.userName, 
this.password, topologyListGetter) ;
+               } catch (IOException e) {
+                       LOG.error("Unable to get Topology list from [" + 
stormUIUrl + "]", e) ;
+               }
+               
+               return ret;
+       }
+       
+       public static <T> T executeUnderKerberos(String userName, String 
password,
+                       PrivilegedAction<T> action) throws IOException {
+               
+               final String errMsg = errMessage;
+               class MySecureClientLoginConfiguration extends
+                               javax.security.auth.login.Configuration {
+
+                       private String userName;
+                       private String password ;
+
+                       MySecureClientLoginConfiguration(String aUserName,
+                                       String password) {
+                               this.userName = aUserName;
+                               this.password = password;
+                       }
+
+                       @Override
+                       public AppConfigurationEntry[] getAppConfigurationEntry(
+                                       String appName) {
+
+                               Map<String, String> kerberosOptions = new 
HashMap<String, String>();
+                               kerberosOptions.put("principal", this.userName);
+                               kerberosOptions.put("debug", "true");
+                               kerberosOptions.put("useKeyTab", "false");
+                               
kerberosOptions.put(KrbPasswordSaverLoginModule.USERNAME_PARAM, this.userName);
+                               
kerberosOptions.put(KrbPasswordSaverLoginModule.PASSWORD_PARAM, this.password);
+                               kerberosOptions.put("doNotPrompt", "false");
+                               kerberosOptions.put("useFirstPass", "true");
+                               kerberosOptions.put("tryFirstPass", "false");
+                               kerberosOptions.put("storeKey", "true");
+                               kerberosOptions.put("refreshKrb5Config", 
"true");
+
+                               AppConfigurationEntry KEYTAB_KERBEROS_LOGIN = 
null;
+                               AppConfigurationEntry KERBEROS_PWD_SAVER = null;
+                               try {
+                                       KEYTAB_KERBEROS_LOGIN = new 
AppConfigurationEntry(
+                                                       
KerberosUtil.getKrb5LoginModuleName(),
+                                                       
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                                       kerberosOptions);
+                                       KERBEROS_PWD_SAVER = new 
AppConfigurationEntry(KrbPasswordSaverLoginModule.class.getName(), 
LoginModuleControlFlag.REQUIRED, kerberosOptions);
+
+                               } catch (IllegalArgumentException e) {
+                                       String msgDesc = "executeUnderKerberos: 
Exception while getting Storm TopologyList.";
+                                       HadoopException hdpException = new 
HadoopException(msgDesc,
+                                                       e);
+                                       LOG.error(msgDesc, e);
+
+                                       
hdpException.generateResponseDataMap(false,
+                                                       
BaseClient.getMessage(e), msgDesc + errMsg, null,
+                                                       null);
+                                       throw hdpException;
+                               }
+                
+                               LOG.debug("getAppConfigurationEntry():" + 
kerberosOptions.get("principal"));
+                               
+                return new AppConfigurationEntry[] { KERBEROS_PWD_SAVER, 
KEYTAB_KERBEROS_LOGIN };
+                       }
+
+               };
+
+               T ret = null;
+
+               Subject subject = null;
+               LoginContext loginContext = null;
+
+               try {
+                   subject = new Subject();
+                       LOG.debug("executeUnderKerberos():user=" + userName + 
",pass=");
+                       LOG.debug("executeUnderKerberos():Creating config..");
+                       MySecureClientLoginConfiguration loginConf = new 
MySecureClientLoginConfiguration(
+                                       userName, password);
+                       LOG.debug("executeUnderKerberos():Creating Context..");
+                       loginContext = new 
LoginContext("hadoop-keytab-kerberos", subject,
+                                       null, loginConf);
+                       
+                       LOG.debug("executeUnderKerberos():Logging in..");
+                       loginContext.login();
+
+                       Subject loginSubj = loginContext.getSubject();
+
+                       if (loginSubj != null) {
+                               ret = Subject.doAs(loginSubj, action);
+                       }
+               } catch (LoginException le) {
+                       String msgDesc = "executeUnderKerberos: Login failure 
using given"
+                                       + " configuration parameters, username 
: `" + userName + "`.";
+                       HadoopException hdpException = new 
HadoopException(msgDesc, le);
+                       LOG.error(msgDesc, le);
+
+                       hdpException.generateResponseDataMap(false,
+                                       BaseClient.getMessage(le), msgDesc + 
errMsg, null, null);
+                       throw hdpException;
+               } catch (SecurityException se) {
+                       String msgDesc = "executeUnderKerberos: Exception while 
getting Storm TopologyList.";
+                       HadoopException hdpException = new 
HadoopException(msgDesc, se);
+                       LOG.error(msgDesc, se);
+
+                       hdpException.generateResponseDataMap(false,
+                                       BaseClient.getMessage(se), msgDesc + 
errMsg, null, null);
+                       throw hdpException;
+
+               } finally {
+                       if (loginContext != null) {
+                               if (subject != null) {
+                                       try {
+                                               loginContext.logout();
+                                       } catch (LoginException e) {
+                                               throw new IOException("logout 
failure", e);
+                                       }
+                               }
+                       }
+               }
+
+               return ret;
+       }
+
+       public static HashMap<String, Object> testConnection(String serviceName,
+                       Map<String, String> configs) {
+
+               List<String> strList = new ArrayList<String>();
+               String errMsg = errMessage;
+               boolean connectivityStatus = false;
+               HashMap<String, Object> responseData = new HashMap<String, 
Object>();
+
+               StormClient stormClient = getStormClient(serviceName,
+                               configs);
+               strList = getStormResources(stormClient, "",null);
+
+               if (strList != null) {
+                       connectivityStatus = true;
+               }
+
+               if (connectivityStatus) {
+                       String successMsg = "TestConnection Successful";
+                       BaseClient.generateResponseDataMap(connectivityStatus, 
successMsg,
+                                       successMsg, null, null, responseData);
+               } else {
+                       String failureMsg = "Unable to retrieve any topologies 
using given parameters.";
+                       BaseClient.generateResponseDataMap(connectivityStatus, 
failureMsg,
+                                       failureMsg + errMsg, null, null, 
responseData);
+               }
+
+               return responseData;
+       }
+
+       public static StormClient getStormClient(String serviceName,
+                       Map<String, String> configs) {
+               StormClient stormClient = null;
+               LOG.debug("Getting StormClient for datasource: " + serviceName
+                               + "configMap: " + configs);
+               String errMsg = errMessage;
+               if (configs == null || configs.isEmpty()) {
+                       String msgDesc = "Could not connect as Connection 
ConfigMap is empty.";
+                       LOG.error(msgDesc);
+                       HadoopException hdpException = new 
HadoopException(msgDesc);
+                       hdpException.generateResponseDataMap(false, msgDesc, 
msgDesc
+                                       + errMsg, null, null);
+                       throw hdpException;
+               } else {
+                       String stormUrl = configs.get("nimbus.url");
+                       String stormAdminUser = configs.get("username");
+                       String stormAdminPassword = configs.get("password");
+                       stormClient = new StormClient(stormUrl, stormAdminUser,
+                                       stormAdminPassword);
+               }
+               return stormClient;
+       }
+
+       public static List<String> getStormResources(final StormClient 
stormClient,
+                       String topologyName, List<String> stormTopologyList) {
+
+               List<String> resultList = new ArrayList<String>();
+               String errMsg = errMessage;
+
+               try {
+                       if (stormClient == null) {
+                               String msgDesc = "Unable to get Storm 
resources: StormClient is null.";
+                               LOG.error(msgDesc);
+                               HadoopException hdpException = new 
HadoopException(msgDesc);
+                               hdpException.generateResponseDataMap(false, 
msgDesc, msgDesc
+                                               + errMsg, null, null);
+                               throw hdpException;
+                       }
+
+                       if (topologyName != null) {
+                               String finalTopologyNameMatching = 
(topologyName == null) ? ""
+                                               : topologyName.trim();
+                               resultList = stormClient
+                                               
.getTopologyList(finalTopologyNameMatching,stormTopologyList);
+                               if (resultList != null) {
+                                       LOG.debug("Returning list of " + 
resultList.size() + " topologies");
+                               }
+                       }
+               } catch (HadoopException he) {
+                       throw he;
+               } catch (Exception e) {
+                       String msgDesc = "getStormResources: Unable to get 
Storm resources.";
+                       LOG.error(msgDesc, e);
+                       HadoopException hdpException = new 
HadoopException(msgDesc);
+
+                       hdpException.generateResponseDataMap(false,
+                                       BaseClient.getMessage(e), msgDesc + 
errMsg, null, null);
+                       throw hdpException;
+               }
+               return resultList;
+       }
+       
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a67360d2/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormConnectionMgr.java
----------------------------------------------------------------------
diff --git 
a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormConnectionMgr.java
 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormConnectionMgr.java
new file mode 100644
index 0000000..5d008e7
--- /dev/null
+++ 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormConnectionMgr.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.services.storm.client;
+
+import org.apache.log4j.Logger;
+
+
+public class StormConnectionMgr {
+
+       public static final Logger LOG = 
Logger.getLogger(StormConnectionMgr.class);
+    
+       public static StormClient getStormClient(final String stormUIURL, 
String userName, String password) {
+        StormClient stormClient = null;
+        if (stormUIURL == null || stormUIURL.isEmpty()) {
+               LOG.error("Can not create StormClient: stormUIURL is empty");
+        } else if (userName == null || userName.isEmpty()) {
+               LOG.error("Can not create StormClient: stormAdminUser is 
empty");
+        } else if (password == null || password.isEmpty()) {
+               LOG.error("Can not create StormClient: stormAdminPassword is 
empty");
+        } else {
+            stormClient =  new StormClient(stormUIURL, userName, password);
+        }
+        return stormClient;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a67360d2/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java
----------------------------------------------------------------------
diff --git 
a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java
 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java
new file mode 100644
index 0000000..d232a54
--- /dev/null
+++ 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormResourceMgr.java
@@ -0,0 +1,70 @@
+package org.apache.ranger.services.storm.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.ranger.plugin.service.ResourceLookupContext;
+
+public class StormResourceMgr {
+       public static final     Logger  LOG             = 
Logger.getLogger(StormResourceMgr.class);
+       private static final    String  TOPOLOGY        = "topology";
+       
+       public static HashMap<String, Object> validateConfig(String 
serviceName, Map<String, String> configs) throws Exception {
+               HashMap<String, Object> ret = null;
+               
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> StormResourceMgr.validateConfig 
ServiceName: "+ serviceName + "Configs" + configs ) ;
+               }       
+               
+               try {
+                       ret = StormClient.testConnection(serviceName, configs);
+               } catch (Exception e) {
+                       LOG.error("<== StormResourceMgr.validateConfig Error: " 
+ e) ;
+                 throw e;
+               }
+               
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== StormResourceMgr.validateConfig Result : 
"+ ret  ) ;
+               }       
+               return ret;
+       }
+       
+    public static List<String> getStormResources(String serviceName, 
Map<String, String> configs,ResourceLookupContext context) {
+        String                  userInput                                = 
context.getUserInput();
+               String           resource                                 = 
context.getResourceName();
+               Map<String, List<String>> resourceMap = context.getResources();
+           List<String>                resultList        = null;
+               List<String>            StormTopologyList = null;
+               String                          StromTopologyName = null;
+               
+               if ( resourceMap != null && !resourceMap.isEmpty() &&
+                       resourceMap.get(TOPOLOGY) != null ) {
+                       StromTopologyName = userInput;
+                       StormTopologyList = resourceMap.get(TOPOLOGY); 
+               } else {
+                       StromTopologyName = userInput;
+               }
+               
+               
+        if (configs == null || configs.isEmpty()) {
+                LOG.error("Connection Config is empty");
+
+        } else {
+                
+                String url             = configs.get("nimbus.url");
+                String username = configs.get("username");
+                String password = configs.get("password");
+                resultList = getStormResources(url, username, 
password,StromTopologyName,StormTopologyList) ;
+        }
+        return resultList ;
+    }
+
+    public static List<String> getStormResources(String url, String username, 
String password,String topologyName, List<String> StormTopologyList) {
+        final StormClient stormClient = StormConnectionMgr.getStormClient(url, 
username, password);
+        List<String> topologyList = 
stormClient.getTopologyList(topologyName,StormTopologyList) ;
+        return topologyList;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a67360d2/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/Topology.java
----------------------------------------------------------------------
diff --git 
a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/Topology.java
 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/Topology.java
new file mode 100644
index 0000000..581a843
--- /dev/null
+++ 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/Topology.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.services.storm.client.json.model;
+
+public class Topology {
+       private String id ;
+       private String name ;
+       private String status ;
+       
+       public String getId() {
+               return id;
+       }
+       public void setId(String id) {
+               this.id = id;
+       }
+       public String getName() {
+               return name;
+       }
+       public void setName(String name) {
+               this.name = name;
+       }
+       public String getStatus() {
+               return status;
+       }
+       public void setStatus(String status) {
+               this.status = status;
+       }
+       
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a67360d2/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/TopologyListResponse.java
----------------------------------------------------------------------
diff --git 
a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/TopologyListResponse.java
 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/TopologyListResponse.java
new file mode 100644
index 0000000..e8de8c9
--- /dev/null
+++ 
b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/json/model/TopologyListResponse.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.services.storm.client.json.model;
+
+import java.util.List;
+
+import com.google.gson.annotations.SerializedName;
+
+public class TopologyListResponse {
+       @SerializedName("topologies")
+       private List<Topology>  topologyList;
+
+       public List<Topology> getTopologyList() {
+               return topologyList;
+       }
+
+       public void setTopologyList(List<Topology> topologyList) {
+               this.topologyList = topologyList;
+       }
+       
+}

Reply via email to