zjffdu commented on a change in pull request #3887:
URL: https://github.com/apache/zeppelin/pull/3887#discussion_r482144046



##########
File path: 
zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.GetRequest;
+import kong.unirest.HttpResponse;
+import kong.unirest.JsonNode;
+import kong.unirest.Unirest;
+import kong.unirest.apache.ApacheClient;
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import unirest.shaded.org.apache.http.client.HttpClient;
+import unirest.shaded.org.apache.http.impl.client.HttpClients;
+
+import javax.net.ssl.SSLContext;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Low level api for interacting with Zeppelin. Underneath, it use the 
zeppelin rest api.
+ * You can use this class to operate Zeppelin note/paragraph,
+ * e.g. get/add/delete/update/execute/cancel
+ */
+public class ZeppelinClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ZeppelinClient.class);
+
+  private ClientConfig clientConfig;
+
+  public ZeppelinClient(ClientConfig clientConfig) throws Exception {
+    this.clientConfig = clientConfig;
+    Unirest.config().defaultBaseUrl(clientConfig.getZeppelinRestUrl() + 
"/api");
+
+    if (clientConfig.isUseKnox()) {
+      try {
+        SSLContext sslContext = new 
SSLContextBuilder().loadTrustMaterial(null, new TrustSelfSignedStrategy() {
+          public boolean isTrusted(X509Certificate[] chain, String authType) {
+            return true;
+          }
+        }).build();
+        HttpClient customHttpClient = 
HttpClients.custom().setSSLContext(sslContext)
+                .setSSLHostnameVerifier(new NoopHostnameVerifier()).build();
+        Unirest.config().httpClient(ApacheClient.builder(customHttpClient));
+      } catch (Exception e) {
+        throw new Exception("Fail to setup httpclient of Unirest", e);
+      }
+    }
+  }
+
+  public ClientConfig getClientConfig() {
+    return clientConfig;
+  }
+
+  /**
+   * Throw exception if the status code is not 200.
+   *
+   * @param response
+   * @throws Exception
+   */
+  private void checkResponse(HttpResponse<JsonNode> response) throws Exception 
{
+    if (response.getStatus() == 302) {
+      throw new Exception("Please login first");
+    }
+    if (response.getStatus() != 200) {
+      throw new Exception(String.format("Unable to call rest api, status: %s, 
statusText: %s, message: %s",
+              response.getStatus(),
+              response.getStatusText(),
+              response.getBody().getObject().getString("message")));
+    }
+  }
+
+  /**
+   * Throw exception if the status in the json object is not `OK`.
+   *
+   * @param jsonNode
+   * @throws Exception
+   */
+  private void checkJsonNodeStatus(JsonNode jsonNode) throws Exception {
+    if (! "OK".equalsIgnoreCase(jsonNode.getObject().getString("status"))) {
+      throw new 
Exception(StringEscapeUtils.unescapeJava(jsonNode.getObject().getString("message")));
+    }
+  }
+
+  /**
+   * Get Zeppelin version.
+   *
+   * @return
+   * @throws Exception
+   */
+  public String getVersion() throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/version")
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return jsonNode.getObject().getJSONObject("body").getString("version");
+  }
+
+  /**
+   * Request a new session id. It doesn't create session (interpreter process) 
in zeppelin server side, but just
+   * create an unique session id.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public SessionInfo newSession(String interpreter) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .post("/session")
+            .queryString("interpreter", interpreter)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return new SessionInfo(jsonNode.getObject().getJSONObject("body"));
+  }
+
+  /**
+   * Stop the session(interpreter process) in Zeppelin server.
+   *
+   * @param sessionId
+   * @throws Exception
+   */
+  public void stopSession(String sessionId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/session/{sessionId}")
+            .routeParam("sessionId", sessionId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Get session info for the provided sessionId.
+   *
+   * @param sessionId
+   * @throws Exception
+   */
+  public SessionInfo getSession(String sessionId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/session/{sessionId}")
+            .routeParam("sessionId", sessionId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject bodyObject = jsonNode.getObject().getJSONObject("body");
+    return new SessionInfo(bodyObject);
+  }
+
+  /**
+   * List all the sessions.
+   *
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> listSessions() throws Exception {
+    return listSessions(null);
+  }
+
+  /**
+   * List all the sessions for the provided interpreter.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> listSessions(String interpreter) throws Exception {
+    GetRequest getRequest = Unirest.get("/session");
+    if (interpreter != null) {
+      getRequest.queryString("interpreter", interpreter);
+    }
+    HttpResponse<JsonNode> response = getRequest.asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    JSONArray sessionJsonArray = jsonNode.getObject().getJSONArray("body");
+    List<SessionInfo> sessionInfos = new ArrayList<>();
+    for (int i = 0; i< sessionJsonArray.length();++i) {
+      sessionInfos.add(new SessionInfo(sessionJsonArray.getJSONObject(i)));
+    }
+    return sessionInfos;
+  }
+
+  /**
+   * Login zeppelin with userName and password, throw exception if login fails.
+   *
+   * @param userName
+   * @param password
+   * @throws Exception
+   */
+  public void login(String userName, String password) throws Exception {
+    if (clientConfig.isUseKnox()) {
+      HttpResponse<String> response = Unirest.get("/")
+              .basicAuth(userName, password)
+              .asString();
+      if (response.getStatus() != 200) {
+        throw new Exception(String.format("Login failed, status: %s, 
statusText: %s",
+                response.getStatus(),
+                response.getStatusText()));
+      }
+    } else {
+      HttpResponse<JsonNode> response = Unirest
+              .post("/login")
+              .field("userName", userName)
+              .field("password", password)
+              .asJson();
+      if (response.getStatus() != 200) {
+        throw new Exception(String.format("Login failed, status: %s, 
statusText: %s",
+                response.getStatus(),
+                response.getStatusText()));
+      }
+    }
+  }
+
+  public String createNote(String notePath) throws Exception {
+    return createNote(notePath, "");
+  }
+
+  /**
+   * Create a new empty note with provided notePath and defaultInterpreterGroup
+   *
+   * @param notePath
+   * @param defaultInterpreterGroup
+   * @return
+   * @throws Exception
+   */
+  public String createNote(String notePath, String defaultInterpreterGroup) 
throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("name", notePath);
+    bodyObject.put("defaultInterpreterGroup", defaultInterpreterGroup);
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook")
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    return jsonNode.getObject().getString("body");
+  }
+
+  /**
+   * Delete note with provided noteId.
+   *
+   * @param noteId
+   * @throws Exception
+   */
+  public void deleteNote(String noteId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/notebook/{noteId}")
+            .routeParam("noteId", noteId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Query {@link NoteResult} with provided noteId.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult queryNoteResult(String noteId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/notebook/{noteId}")
+            .routeParam("noteId", noteId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject noteJsonObject = jsonNode.getObject().getJSONObject("body");
+    boolean isRunning = false;
+    if (noteJsonObject.has("info")) {
+      JSONObject infoJsonObject = noteJsonObject.getJSONObject("info");
+      if (infoJsonObject.has("isRunning")) {
+        isRunning = 
Boolean.parseBoolean(infoJsonObject.getString("isRunning"));
+      }
+    }
+
+    List<ParagraphResult> paragraphResultList = new ArrayList<>();
+    if (noteJsonObject.has("paragraphs")) {
+      JSONArray paragraphJsonArray = noteJsonObject.getJSONArray("paragraphs");
+      for (int i = 0; i< paragraphJsonArray.length(); ++i) {
+        paragraphResultList.add(new 
ParagraphResult(paragraphJsonArray.getJSONObject(i)));
+      }
+    }
+
+    return new NoteResult(noteId, isRunning, paragraphResultList);
+  }
+
+  /**
+   * Execute note with provided noteId, return until note execution is 
completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult executeNote(String noteId) throws Exception {
+    return executeNote(noteId, new HashMap<>());
+  }
+
+  /**
+   * Execute note with provided noteId and parameters, return until note 
execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public NoteResult executeNote(String noteId, Map<String, String> parameters) 
throws Exception {
+    submitNote(noteId, parameters);
+    return waitUntilNoteFinished(noteId);
+  }
+
+  /**
+   * Submit note to execute with provided noteId, return at once the 
submission is completed.
+   * You need to query {@link NoteResult} by yourself afterwards until note 
execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult submitNote(String noteId) throws Exception  {
+    return submitNote(noteId, new HashMap<>());
+  }
+
+  /**
+   * Submit note to execute with provided noteId and parameters, return at 
once the submission is completed.
+   * You need to query {@link NoteResult} by yourself afterwards until note 
execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public NoteResult submitNote(String noteId, Map<String, String> parameters) 
throws Exception  {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("params", parameters);
+    // run note in non-blocking and isolated way.
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook/job/{noteId}")
+            .routeParam("noteId", noteId)
+            .queryString("blocking", "false")
+            .queryString("isolated", "true")
+            .body(bodyObject)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return queryNoteResult(noteId);
+  }
+
+  /**
+   * Block there until note execution is completed.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult waitUntilNoteFinished(String noteId) throws Exception {

Review comment:
       There's another method `waitUntilNoteFinished(String noteId, long 
timeoutInMills)` which allow user to set timeout. The reason I didn't add a 
default large timeout is because of streaming scenario that may run forever, 
such as flink streaming job.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to