Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 40dfdecb8 -> a1f8a17a1


ZEPPELIN-1037 Enable Kerberos support in Livy Interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a1f8a17a
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a1f8a17a
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a1f8a17a

Branch: refs/heads/branch-0.6
Commit: a1f8a17a166784cb62680ba1451fcc3cf19556af
Parents: 40dfdec
Author: Renjith Kamath <[email protected]>
Authored: Tue Jun 21 23:35:14 2016 +0530
Committer: Prabhjyot Singh <[email protected]>
Committed: Mon Jun 27 21:36:09 2016 +0530

----------------------------------------------------------------------
 livy/pom.xml                                    |  11 ++
 .../org/apache/zeppelin/livy/LivyHelper.java    | 108 ++++++++-----------
 .../src/main/resources/interpreter-setting.json | 107 ++++++++++++++++++
 3 files changed, 162 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1f8a17a/livy/pom.xml
----------------------------------------------------------------------
diff --git a/livy/pom.xml b/livy/pom.xml
index 90f149c..6fa12b9 100644
--- a/livy/pom.xml
+++ b/livy/pom.xml
@@ -96,6 +96,17 @@
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.security.kerberos</groupId>
+            <artifactId>spring-security-kerberos-client</artifactId>
+            <version>1.0.1.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-web</artifactId>
+            <version>4.3.0.RELEASE</version>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1f8a17a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
index 7f3517e..2c66fa9 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
@@ -20,31 +20,24 @@ package org.apache.zeppelin.livy;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
-
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClientBuilder;
+
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.kerberos.client.KerberosRestTemplate;
+import org.springframework.web.client.RestTemplate;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Properties;
 
 
 /***
@@ -64,23 +57,23 @@ public class LivyHelper {
   public Integer createSession(InterpreterContext context, String kind) throws 
Exception {
     try {
       Map<String, String> conf = new HashMap<String, String>();
-      
+
       Iterator<Entry<Object, Object>> it = property.entrySet().iterator();
       while (it.hasNext()) {
         Entry<Object, Object> pair = it.next();
-        if (pair.getKey().toString().startsWith("livy.spark.") && 
+        if (pair.getKey().toString().startsWith("livy.spark.") &&
             !pair.getValue().toString().isEmpty())
           conf.put(pair.getKey().toString().substring(5), 
pair.getValue().toString());
       }
 
       String confData = gson.toJson(conf);
+      String user = context.getAuthenticationInfo().getUser();
 
-      String json = executeHTTP(property.getProperty("zeppelin.livy.url") + 
"/sessions",
-          "POST", 
+      String json = executeHTTP(property.getProperty("zeppelin.livy.url") + 
"/sessions", "POST",
           "{" +
               "\"kind\": \"" + kind + "\", " +
-              "\"conf\": " + confData + ", " + 
-              "\"proxyUser\": " + context.getAuthenticationInfo().getUser() + 
+              "\"conf\": " + confData + ", " +
+              "\"proxyUser\": " + (StringUtils.isEmpty(user) ? null : "\"" + 
user + "\"") +
               "}",
           context.getParagraphId()
       );
@@ -96,9 +89,8 @@ public class LivyHelper {
           LOGGER.error(String.format("sessionId:%s state is %s",
               jsonMap.get("id"), jsonMap.get("state")));
           Thread.sleep(1000);
-          json = executeHTTP(property.getProperty("zeppelin.livy.url") + 
"/sessions/" + sessionId,
-              "GET", null,
-              context.getParagraphId());
+          json = executeHTTP(property.getProperty("zeppelin.livy.url") + 
"/sessions/" +
+              sessionId, "GET", null, context.getParagraphId());
           jsonMap = (Map<Object, Object>) gson.fromJson(json,
               new TypeToken<Map<Object, Object>>() {
               }.getType());
@@ -335,66 +327,54 @@ public class LivyHelper {
     }
   }
 
+  private RestTemplate getRestTemplate() {
+    String keytabLocation = property.getProperty("zeppelin.livy.keytab");
+    String principal = property.getProperty("zeppelin.livy.principal");
+    if (StringUtils.isNotEmpty(keytabLocation) && 
StringUtils.isNotEmpty(principal)) {
+      return new KerberosRestTemplate(keytabLocation, principal);
+    }
+    return new RestTemplate();
+  }
+
   protected String executeHTTP(String targetURL, String method, String 
jsonData, String paragraphId)
       throws Exception {
-    HttpClient client = HttpClientBuilder.create().build();
-    HttpResponse response = null;
+    RestTemplate restTemplate = getRestTemplate();
+    HttpHeaders headers = new HttpHeaders();
+    headers.add("Content-Type", "application/json");
+    ResponseEntity<String> response = null;
     if (method.equals("POST")) {
-      HttpPost request = new HttpPost(targetURL);
-      request.addHeader("Content-Type", "application/json");
-      StringEntity se = new StringEntity(jsonData);
-      request.setEntity(se);
-      response = client.execute(request);
-      paragraphHttpMap.put(paragraphId, request);
+      HttpEntity<String> entity = new HttpEntity<String>(jsonData, headers);
+      response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, 
String.class);
+      paragraphHttpMap.put(paragraphId, response);
     } else if (method.equals("GET")) {
-      HttpGet request = new HttpGet(targetURL);
-      request.addHeader("Content-Type", "application/json");
-      response = client.execute(request);
-      paragraphHttpMap.put(paragraphId, request);
+      HttpEntity<String> entity = new HttpEntity<String>(headers);
+      response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, 
String.class);
+      paragraphHttpMap.put(paragraphId, response);
     } else if (method.equals("DELETE")) {
-      HttpDelete request = new HttpDelete(targetURL);
-      request.addHeader("Content-Type", "application/json");
-      response = client.execute(request);
+      HttpEntity<String> entity = new HttpEntity<String>(headers);
+      response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, 
String.class);
     }
-
     if (response == null) {
       return null;
     }
 
-    if (response.getStatusLine().getStatusCode() == 200
-        || response.getStatusLine().getStatusCode() == 201
-        || response.getStatusLine().getStatusCode() == 404) {
-      return getResponse(response);
+    if (response.getStatusCode().value() == 200
+            || response.getStatusCode().value() == 201
+            || response.getStatusCode().value() == 404) {
+      return response.getBody();
     } else {
-      String responseString = getResponse(response);
+      String responseString = response.getBody();
       if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) 
{
         return responseString;
       }
       LOGGER.error(String.format("Error with %s StatusCode: %s",
-          response.getStatusLine().getStatusCode(), responseString));
+              response.getStatusCode().value(), responseString));
       throw new Exception(String.format("Error with %s StatusCode: %s",
-          response.getStatusLine().getStatusCode(), responseString));
+              response.getStatusCode().value(), responseString));
     }
   }
 
-  private String getResponse(HttpResponse response) throws Exception {
-    BufferedReader rd = new BufferedReader(
-        new InputStreamReader(response.getEntity().getContent()));
-
-    StringBuffer result = new StringBuffer();
-    String line = "";
-    while ((line = rd.readLine()) != null) {
-      result.append(line);
-    }
-    return result.toString();
-  }
-
   public void cancelHTTP(String paragraphId) {
-    if 
(paragraphHttpMap.get(paragraphId).getClass().getName().contains("HttpPost")) {
-      ((HttpPost) paragraphHttpMap.get(paragraphId)).abort();
-    } else {
-      ((HttpGet) paragraphHttpMap.get(paragraphId)).abort();
-    }
     paragraphHttpMap.put(paragraphId, null);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1f8a17a/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json 
b/livy/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..7ae435f
--- /dev/null
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -0,0 +1,107 @@
+[
+  {
+    "group": "livy",
+    "name": "spark",
+    "className": "org.apache.zeppelin.livy.LivySparkInterpreter",
+    "properties": {
+      "zeppelin.livy.url": {
+        "envName": "ZEPPELIN_LIVY_HOST_URL",
+        "propertyName": "zeppelin.livy.url",
+        "defaultValue": "http://localhost:8998";,
+        "description": "The URL for Livy Server."
+      },
+      "livy.spark.master": {
+        "propertyName": "livy.spark.master",
+        "defaultValue": "local[*]",
+        "description": "Spark master uri. ex) spark://masterhost:7077"
+      },
+      "livy.spark.driver.cores": {
+        "propertyName": "livy.spark.driver.cores",
+        "defaultValue": "",
+        "description": "Driver cores. ex) 1, 2"
+      },
+      "livy.spark.driver.memory": {
+        "propertyName": "livy.spark.driver.memory",
+        "defaultValue": "",
+        "description": "Driver memory. ex) 512m, 32g"
+      },
+      "livy.spark.executor.instances": {
+        "propertyName": "livy.spark.executor.instances",
+        "defaultValue": "",
+        "description": "Executor instances. ex) 1, 4"
+      },
+      "livy.spark.executor.cores": {
+        "propertyName": "livy.spark.executor.cores",
+        "defaultValue": "",
+        "description": "Num cores per executor. ex) 1, 4"
+      },
+      "livy.spark.executor.memory": {
+        "propertyName": "livy.spark.executor.memory",
+        "defaultValue": "",
+        "description": "Executor memory per worker instance. ex) 512m, 32g"
+      },
+      "livy.spark.dynamicAllocation.enabled": {
+        "propertyName": "livy.spark.dynamicAllocation.enabled",
+        "defaultValue": "",
+        "description": "Use dynamic resource allocation"
+      },
+      "livy.spark.dynamicAllocation.cachedExecutorIdleTimeout": {
+        "propertyName": 
"livy.spark.dynamicAllocation.cachedExecutorIdleTimeout",
+        "defaultValue": "",
+        "description": "Remove an executor which has cached data blocks"
+      },
+      "livy.spark.dynamicAllocation.minExecutors": {
+        "propertyName": "livy.spark.dynamicAllocation.minExecutors",
+        "defaultValue": "",
+        "description": "Lower bound for the number of executors if dynamic 
allocation is enabled."
+      },
+      "livy.spark.dynamicAllocation.initialExecutors": {
+        "propertyName": "livy.spark.dynamicAllocation.initialExecutors",
+        "defaultValue": "",
+        "description": "Initial number of executors to run if dynamic 
allocation is enabled."
+      },
+      "livy.spark.dynamicAllocation.maxExecutors": {
+        "propertyName": "livy.spark.dynamicAllocation.maxExecutors",
+        "defaultValue": "",
+        "description": "Upper bound for the number of executors if dynamic 
allocation is enabled."
+      },
+      "zeppelin.livy.principal": {
+        "propertyName": "zeppelin.livy.principal",
+        "defaultValue": "",
+        "description": "Kerberos principal to authenticate livy"
+      },
+      "zeppelin.livy.keytab": {
+        "propertyName": "zeppelin.livy.keytab",
+        "defaultValue": "",
+        "description": "Kerberos keytab to authenticate livy"
+      }
+    }
+  },
+  {
+    "group": "livy",
+    "name": "sql",
+    "className": "org.apache.zeppelin.livy.LivySparkSQLInterpreter",
+    "properties": {
+      "zeppelin.livy.spark.sql.maxResult": {
+        "envName": "ZEPPELIN_LIVY_MAXRESULT",
+        "propertyName": "zeppelin.livy.spark.sql.maxResult",
+        "defaultValue": "1000",
+        "description": "Max number of SparkSQL result to display."
+      }
+    }
+  },
+  {
+    "group": "livy",
+    "name": "pyspark",
+    "className": "org.apache.zeppelin.livy.LivyPySparkInterpreter",
+    "properties": {
+    }
+  },
+  {
+    "group": "livy",
+    "name": "sparkr",
+    "className": "org.apache.zeppelin.livy.LivySparkRInterpreter",
+    "properties": {
+    }
+  }
+]
\ No newline at end of file

Reply via email to