Repository: zeppelin Updated Branches: refs/heads/branch-0.6 40dfdecb8 -> a1f8a17a1
ZEPPELIN-1037 Enable Kerberos support in Livy Interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a1f8a17a Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a1f8a17a Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a1f8a17a Branch: refs/heads/branch-0.6 Commit: a1f8a17a166784cb62680ba1451fcc3cf19556af Parents: 40dfdec Author: Renjith Kamath <[email protected]> Authored: Tue Jun 21 23:35:14 2016 +0530 Committer: Prabhjyot Singh <[email protected]> Committed: Mon Jun 27 21:36:09 2016 +0530 ---------------------------------------------------------------------- livy/pom.xml | 11 ++ .../org/apache/zeppelin/livy/LivyHelper.java | 108 ++++++++----------- .../src/main/resources/interpreter-setting.json | 107 ++++++++++++++++++ 3 files changed, 162 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1f8a17a/livy/pom.xml ---------------------------------------------------------------------- diff --git a/livy/pom.xml b/livy/pom.xml index 90f149c..6fa12b9 100644 --- a/livy/pom.xml +++ b/livy/pom.xml @@ -96,6 +96,17 @@ <version>${mockito.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.springframework.security.kerberos</groupId> + <artifactId>spring-security-kerberos-client</artifactId> + <version>1.0.1.RELEASE</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-web</artifactId> + <version>4.3.0.RELEASE</version> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1f8a17a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 7f3517e..2c66fa9 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -20,31 +20,24 @@ package org.apache.zeppelin.livy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; - import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.HttpClientBuilder; + import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.security.kerberos.client.KerberosRestTemplate; +import org.springframework.web.client.RestTemplate; -import java.io.BufferedReader; -import java.io.InputStreamReader; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Properties; /*** @@ -64,23 +57,23 @@ public class LivyHelper { public Integer createSession(InterpreterContext context, String kind) throws Exception { try { Map<String, String> conf = new HashMap<String, String>(); - + Iterator<Entry<Object, Object>> it = property.entrySet().iterator(); while (it.hasNext()) { Entry<Object, Object> pair = it.next(); - if (pair.getKey().toString().startsWith("livy.spark.") && + if (pair.getKey().toString().startsWith("livy.spark.") && !pair.getValue().toString().isEmpty()) conf.put(pair.getKey().toString().substring(5), pair.getValue().toString()); } String confData = gson.toJson(conf); + String user = context.getAuthenticationInfo().getUser(); - String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", - "POST", + String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", "POST", "{" + "\"kind\": \"" + kind + "\", " + - "\"conf\": " + confData + ", " + - "\"proxyUser\": " + context.getAuthenticationInfo().getUser() + + "\"conf\": " + confData + ", " + + "\"proxyUser\": " + (StringUtils.isEmpty(user) ? null : "\"" + user + "\"") + "}", context.getParagraphId() ); @@ -96,9 +89,8 @@ public class LivyHelper { LOGGER.error(String.format("sessionId:%s state is %s", jsonMap.get("id"), jsonMap.get("state"))); Thread.sleep(1000); - json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId, - "GET", null, - context.getParagraphId()); + json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + + sessionId, "GET", null, context.getParagraphId()); jsonMap = (Map<Object, Object>) gson.fromJson(json, new TypeToken<Map<Object, Object>>() { }.getType()); @@ -335,66 +327,54 @@ public class LivyHelper { } } + private RestTemplate getRestTemplate() { + String keytabLocation = property.getProperty("zeppelin.livy.keytab"); + String principal = property.getProperty("zeppelin.livy.principal"); + if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) { + return new KerberosRestTemplate(keytabLocation, principal); + } + return new RestTemplate(); + } + protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId) throws Exception { - HttpClient client = HttpClientBuilder.create().build(); - HttpResponse response = null; + RestTemplate restTemplate = getRestTemplate(); + HttpHeaders headers = new HttpHeaders(); + headers.add("Content-Type", "application/json"); + ResponseEntity<String> response = null; if (method.equals("POST")) { - HttpPost request = new HttpPost(targetURL); - request.addHeader("Content-Type", "application/json"); - StringEntity se = new StringEntity(jsonData); - request.setEntity(se); - response = client.execute(request); - paragraphHttpMap.put(paragraphId, request); + HttpEntity<String> entity = new HttpEntity<String>(jsonData, headers); + response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class); + paragraphHttpMap.put(paragraphId, response); } else if (method.equals("GET")) { - HttpGet request = new HttpGet(targetURL); - request.addHeader("Content-Type", "application/json"); - response = client.execute(request); - paragraphHttpMap.put(paragraphId, request); + HttpEntity<String> entity = new HttpEntity<String>(headers); + response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class); + paragraphHttpMap.put(paragraphId, response); } else if (method.equals("DELETE")) { - HttpDelete request = new HttpDelete(targetURL); - request.addHeader("Content-Type", "application/json"); - response = client.execute(request); + HttpEntity<String> entity = new HttpEntity<String>(headers); + response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class); } - if (response == null) { return null; } - if (response.getStatusLine().getStatusCode() == 200 - || response.getStatusLine().getStatusCode() == 201 - || response.getStatusLine().getStatusCode() == 404) { - return getResponse(response); + if (response.getStatusCode().value() == 200 + || response.getStatusCode().value() == 201 + || response.getStatusCode().value() == 404) { + return response.getBody(); } else { - String responseString = getResponse(response); + String responseString = response.getBody(); if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) { return responseString; } LOGGER.error(String.format("Error with %s StatusCode: %s", - response.getStatusLine().getStatusCode(), responseString)); + response.getStatusCode().value(), responseString)); throw new Exception(String.format("Error with %s StatusCode: %s", - response.getStatusLine().getStatusCode(), responseString)); + response.getStatusCode().value(), responseString)); } } - private String getResponse(HttpResponse response) throws Exception { - BufferedReader rd = new BufferedReader( - new InputStreamReader(response.getEntity().getContent())); - - StringBuffer result = new StringBuffer(); - String line = ""; - while ((line = rd.readLine()) != null) { - result.append(line); - } - return result.toString(); - } - public void cancelHTTP(String paragraphId) { - if (paragraphHttpMap.get(paragraphId).getClass().getName().contains("HttpPost")) { - ((HttpPost) paragraphHttpMap.get(paragraphId)).abort(); - } else { - ((HttpGet) paragraphHttpMap.get(paragraphId)).abort(); - } paragraphHttpMap.put(paragraphId, null); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1f8a17a/livy/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json new file mode 100644 index 0000000..7ae435f --- /dev/null +++ b/livy/src/main/resources/interpreter-setting.json @@ -0,0 +1,107 @@ +[ + { + "group": "livy", + "name": "spark", + "className": "org.apache.zeppelin.livy.LivySparkInterpreter", + "properties": { + "zeppelin.livy.url": { + "envName": "ZEPPELIN_LIVY_HOST_URL", + "propertyName": "zeppelin.livy.url", + "defaultValue": "http://localhost:8998", + "description": "The URL for Livy Server." + }, + "livy.spark.master": { + "propertyName": "livy.spark.master", + "defaultValue": "local[*]", + "description": "Spark master uri. ex) spark://masterhost:7077" + }, + "livy.spark.driver.cores": { + "propertyName": "livy.spark.driver.cores", + "defaultValue": "", + "description": "Driver cores. ex) 1, 2" + }, + "livy.spark.driver.memory": { + "propertyName": "livy.spark.driver.memory", + "defaultValue": "", + "description": "Driver memory. ex) 512m, 32g" + }, + "livy.spark.executor.instances": { + "propertyName": "livy.spark.executor.instances", + "defaultValue": "", + "description": "Executor instances. ex) 1, 4" + }, + "livy.spark.executor.cores": { + "propertyName": "livy.spark.executor.cores", + "defaultValue": "", + "description": "Num cores per executor. ex) 1, 4" + }, + "livy.spark.executor.memory": { + "propertyName": "livy.spark.executor.memory", + "defaultValue": "", + "description": "Executor memory per worker instance. ex) 512m, 32g" + }, + "livy.spark.dynamicAllocation.enabled": { + "propertyName": "livy.spark.dynamicAllocation.enabled", + "defaultValue": "", + "description": "Use dynamic resource allocation" + }, + "livy.spark.dynamicAllocation.cachedExecutorIdleTimeout": { + "propertyName": "livy.spark.dynamicAllocation.cachedExecutorIdleTimeout", + "defaultValue": "", + "description": "Remove an executor which has cached data blocks" + }, + "livy.spark.dynamicAllocation.minExecutors": { + "propertyName": "livy.spark.dynamicAllocation.minExecutors", + "defaultValue": "", + "description": "Lower bound for the number of executors if dynamic allocation is enabled." + }, + "livy.spark.dynamicAllocation.initialExecutors": { + "propertyName": "livy.spark.dynamicAllocation.initialExecutors", + "defaultValue": "", + "description": "Initial number of executors to run if dynamic allocation is enabled." + }, + "livy.spark.dynamicAllocation.maxExecutors": { + "propertyName": "livy.spark.dynamicAllocation.maxExecutors", + "defaultValue": "", + "description": "Upper bound for the number of executors if dynamic allocation is enabled." + }, + "zeppelin.livy.principal": { + "propertyName": "zeppelin.livy.principal", + "defaultValue": "", + "description": "Kerberos principal to authenticate livy" + }, + "zeppelin.livy.keytab": { + "propertyName": "zeppelin.livy.keytab", + "defaultValue": "", + "description": "Kerberos keytab to authenticate livy" + } + } + }, + { + "group": "livy", + "name": "sql", + "className": "org.apache.zeppelin.livy.LivySparkSQLInterpreter", + "properties": { + "zeppelin.livy.spark.sql.maxResult": { + "envName": "ZEPPELIN_LIVY_MAXRESULT", + "propertyName": "zeppelin.livy.spark.sql.maxResult", + "defaultValue": "1000", + "description": "Max number of SparkSQL result to display." + } + } + }, + { + "group": "livy", + "name": "pyspark", + "className": "org.apache.zeppelin.livy.LivyPySparkInterpreter", + "properties": { + } + }, + { + "group": "livy", + "name": "sparkr", + "className": "org.apache.zeppelin.livy.LivySparkRInterpreter", + "properties": { + } + } +] \ No newline at end of file
