Repository: zeppelin Updated Branches: refs/heads/master a424f5c65 -> 862871f67
ZEPPELIN-2953 Allow custom http header for livy interpreter ### What is this PR for? This PR is trying to allow user to add custom http headers when calling livy rest api. User just need to specify `zeppelin.livy.http.headers` in livy interpreter setting ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2953 ### How should this be tested? Outline the steps to test the PR here. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2600 from zjffdu/ZEPPELIN-2953 and squashes the following commits: 720d8d0 [Jeff Zhang] ZEPPELIN-2953 Allow custom http header for livy interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/862871f6 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/862871f6 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/862871f6 Branch: refs/heads/master Commit: 862871f67afdc3a6357a759e1543f821bed7d7d2 Parents: a424f5c Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Sep 26 10:12:48 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Oct 13 09:28:10 2017 +0800 ---------------------------------------------------------------------- docs/interpreter/livy.md | 7 ++++- .../zeppelin/livy/BaseLivyInterpreter.java | 33 ++++++++++++++++++++ .../zeppelin/livy/LivySQLInterpreterTest.java | 8 +++++ 3 files changed, 47 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/862871f6/docs/interpreter/livy.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index 1741a80..09bf6e1 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -144,7 +144,12 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory` <td>zeppelin.livy.ssl.trustStorePassword</td> <td></td> <td>password for trustStore file. Used when livy ssl is enabled</td> - </tr> + </tr> + <tr> + <td>zeppelin.livy.http.headers</td> + <td>key_1: value_1; key_2: value_2</td> + <td>custom http headers when calling livy rest api. Each http header is separated by `;`, and each header is one key value pair where key value is separated by `:`</td> + </tr> </table> **We remove livy.spark.master in zeppelin-0.7. Because we sugguest user to use livy 0.3 in zeppelin-0.7. And livy 0.3 don't allow to specify livy.spark.master, it enfornce yarn-cluster mode.** http://git-wip-us.apache.org/repos/asf/zeppelin/blob/862871f6/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java index ccab09b..b725348 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java @@ -62,6 +62,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** @@ -80,6 +82,7 @@ public abstract class BaseLivyInterpreter extends Interpreter { protected boolean displayAppInfo; protected LivyVersion livyVersion; private RestTemplate restTemplate; + private Map<String, String> customHeaders = new HashMap<>(); Set<Object> paragraphsToCancel = Collections.newSetFromMap( new ConcurrentHashMap<Object, Boolean>()); @@ -96,6 +99,33 @@ public abstract class BaseLivyInterpreter extends Interpreter { this.pullStatusInterval = Integer.parseInt( property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + "")); this.restTemplate = createRestTemplate(); + if (!StringUtils.isBlank(property.getProperty("zeppelin.livy.http.headers"))) { + String[] headers = property.getProperty("zeppelin.livy.http.headers").split(";"); + for (String header : headers) { + String[] splits = header.split(":", -1); + if (splits.length != 2) { + throw new RuntimeException("Invalid format of http headers: " + header + + ", valid http header format is HEADER_NAME:HEADER_VALUE"); + } + customHeaders.put(splits[0].trim(), envSubstitute(splits[1].trim())); + } + } + } + + private String envSubstitute(String value) { + String newValue = new String(value); + Pattern pattern = Pattern.compile("\\$\\{(.*)\\}"); + Matcher matcher = pattern.matcher(value); + while (matcher.find()) { + String env = matcher.group(1); + newValue = newValue.replace("${" + env + "}", System.getenv(env)); + } + return newValue; + } + + // only for testing + Map<String, String> getCustomHeaders() { + return customHeaders; } public abstract String getSessionKind(); @@ -523,6 +553,9 @@ public abstract class BaseLivyInterpreter extends Interpreter { HttpHeaders headers = new HttpHeaders(); headers.add("Content-Type", MediaType.APPLICATION_JSON_UTF8_VALUE); headers.add("X-Requested-By", "zeppelin"); + for (Map.Entry<String, String> entry : customHeaders.entrySet()) { + headers.add(entry.getKey(), entry.getValue()); + } ResponseEntity<String> response = null; try { if (method.equals("POST")) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/862871f6/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java index fdef9b1..24d70ec 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java @@ -39,10 +39,18 @@ public class LivySQLInterpreterTest { properties.setProperty("zeppelin.livy.url", "http://localhost:8998"); properties.setProperty("zeppelin.livy.session.create_timeout", "120"); properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3"); + properties.setProperty("zeppelin.livy.http.headers", "HEADER_1: VALUE_1_${HOME}"); sqlInterpreter = new LivySparkSQLInterpreter(properties); } @Test + public void testHttpHeaders() { + assertEquals(1, sqlInterpreter.getCustomHeaders().size()); + assertTrue(sqlInterpreter.getCustomHeaders().get("HEADER_1").startsWith("VALUE_1_")); + assertNotEquals("VALUE_1_${HOME}", sqlInterpreter.getCustomHeaders().get("HEADER_1")); + } + + @Test public void testParseSQLOutput() { // Empty sql output // +---+---+