Repository: zeppelin Updated Branches: refs/heads/master dd20e7bf8 -> 4a1a2e48b
[ZEPPELIN-1627] - Fix latency on notebook save in ZeppelinHubNotebookRepo ### What is this PR for? Fix latency issue when user save a notebook while using zeppelinhub notebook storage. ### What type of PR is it? [Improvement] ### Jira issue [ZEPPELIN-1627](https://issues.apache.org/jira/browse/ZEPPELIN-1627) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Anthony Corbacho <[email protected]> Closes #1591 from anthonycorbacho/feat/ZeppelinHubRepoSaveNoteShouldNotBlock and squashes the following commits: c0b0875 [Anthony Corbacho] Remove unused variables ae52108 [Anthony Corbacho] Fix latency on notebook save in ZeppelinHubNotebookRepo Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/4a1a2e48 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/4a1a2e48 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/4a1a2e48 Branch: refs/heads/master Commit: 4a1a2e48bffa8b18ee10629d7283c7f77aff4cbb Parents: dd20e7b Author: Anthony Corbacho <[email protected]> Authored: Fri Nov 4 15:07:31 2016 +0900 Committer: Anthony Corbacho <[email protected]> Committed: Sat Nov 5 14:03:08 2016 +0900 ---------------------------------------------------------------------- .../rest/ZeppelinhubRestApiHandler.java | 44 +++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4a1a2e48/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java index 82159fc..63699e6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java @@ -27,6 +27,7 @@ import org.apache.commons.lang.StringUtils; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.client.util.StringContentProvider; import org.eclipse.jetty.http.HttpMethod; @@ -114,7 +115,7 @@ public class ZeppelinhubRestApiHandler { } public String asyncGet(String argument) throws IOException { - return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument); + return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument, StringUtils.EMPTY, true); } public String asyncPutWithResponseBody(String url, String json) throws IOException { @@ -122,7 +123,7 @@ public class ZeppelinhubRestApiHandler { LOG.error("Empty note, cannot send it to zeppelinHub"); throw new IOException("Cannot send emtpy note to zeppelinHub"); } - return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json); + return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, true); } public void asyncPut(String jsonNote) throws IOException { @@ -130,7 +131,7 @@ public class ZeppelinhubRestApiHandler { LOG.error("Cannot save empty note/string to ZeppelinHub"); return; } - sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote); + sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, false); } public void asyncDel(String argument) throws IOException { @@ -138,28 +139,41 @@ public class ZeppelinhubRestApiHandler { LOG.error("Cannot delete empty note from ZeppelinHub"); return; } - sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument); + sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, false); } - private String sendToZeppelinHub(HttpMethod method, String url) throws IOException { - return sendToZeppelinHub(method, url, StringUtils.EMPTY); + private String sendToZeppelinHub(HttpMethod method, String url, String json, boolean withResponse) + throws IOException { + Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token); + if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST)) + && !StringUtils.isBlank(json)) { + request.content(new StringContentProvider(json, "UTF-8"), "application/json;charset=UTF-8"); + } + return withResponse ? + sendToZeppelinHub(request) : sendToZeppelinHubWithoutResponseBody(request); + } + + private String sendToZeppelinHubWithoutResponseBody(Request request) throws IOException { + request.send(new Response.CompleteListener() { + @Override + public void onComplete(Result result) { + Request req = result.getRequest(); + LOG.info("ZeppelinHub {} {} returned with status {}: {}", req.getMethod(), + req.getURI(), result.getResponse().getStatus(), result.getResponse().getReason()); + } + }); + return StringUtils.EMPTY; } - private String sendToZeppelinHub(HttpMethod method, String url, String json) throws IOException { + private String sendToZeppelinHub(final Request request) throws IOException { InputStreamResponseListener listener = new InputStreamResponseListener(); Response response; String data; - - Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token); - if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST)) && - !StringUtils.isBlank(json)) { - request.content(new StringContentProvider(json, "UTF-8"), "application/json;charset=UTF-8"); - } request.send(listener); - try { response = listener.get(30, TimeUnit.SECONDS); } catch (InterruptedException | TimeoutException | ExecutionException e) { + String method = request.getMethod(); LOG.error("Cannot perform {} request to ZeppelinHub", method, e); throw new IOException("Cannot perform " + method + " request to ZeppelinHub", e); } @@ -170,6 +184,8 @@ public class ZeppelinhubRestApiHandler { data = IOUtils.toString(responseContent, "UTF-8"); } } else { + String method = response.getRequest().getMethod(); + String url = response.getRequest().getURI().toString(); LOG.error("ZeppelinHub {} {} returned with status {} ", method, url, code); throw new IOException("Cannot perform " + method + " request to ZeppelinHub"); }
