[ 
https://issues.apache.org/jira/browse/GOBBLIN-683?focusedWorklogId=201530&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-201530
 ]

ASF GitHub Bot logged work on GOBBLIN-683:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Feb/19 19:08
            Start Date: 20/Feb/19 19:08
    Worklog Time Spent: 10m 
      Work Description: yukuai518 commented on pull request #2555: 
[GOBBLIN-683] Add azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258633224
 
 

 ##########
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##########
 @@ -402,97 +329,71 @@ public AzkabanExecuteFlowStatus executeFlowWithOptions(
    *
    * @return The status object which contains success status and execution id.
    */
-  public AzkabanExecuteFlowStatus executeFlow(
-      String projectName,
-      String flowName,
-      Map<String, String> flowParameters) {
+  public AzkabanExecuteFlowStatus executeFlow(String projectName,
+                                              String flowName,
+                                              Map<String, String> 
flowParameters) throws AzkabanClientException {
     return executeFlowWithOptions(projectName, flowName, null, flowParameters);
   }
 
   /**
    * Cancel a flow by execution id.
    */
-  public AzkabanClientStatus cancelFlow(int execId) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, 
String.valueOf(execId)));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
-
-      HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      try {
-        handleResponse(response);
-        return new AzkabanClientStatus.SUCCESS();
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanClientStatus.FAIL("", e);
-    }
+  public AzkabanClientStatus cancelFlow(String execId) throws 
AzkabanClientException {
+    AzkabanMultiCallables.CancelFlowCallable callable =
+        new AzkabanMultiCallables.CancelFlowCallable(this,
+            execId);
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
+  /**
+   * Fetch an execution log.
+   */
+  public AzkabanClientStatus fetchExecutionLog(String execId,
+                                               String jobId,
+                                               String offset,
+                                               String length,
+                                               File ouf) throws 
AzkabanClientException {
+    AzkabanMultiCallables.FetchExecLogCallable callable =
+        new AzkabanMultiCallables.FetchExecLogCallable(this,
+            execId,
+            jobId,
+            offset,
+            length,
+            ouf);
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
+  }
 
   /**
-   * Given an execution id, fetches all the detailed information of that 
execution, including a list of all the job executions.
+   * Given an execution id, fetches all the detailed information of that 
execution,
+   * including a list of all the job executions.
    *
    * @param execId execution id to be fetched.
    *
-   * @return The status object which contains success status and all the 
detailed information of that execution.
+   * @return The status object which contains success status and all the 
detailed
+   *         information of that execution.
    */
-  public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, 
"fetchexecflow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
-
-      HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      try {
-        Map<String, String> map = handleResponse(response);
-        return new AzkabanFetchExecuteFlowStatus(new 
AzkabanFetchExecuteFlowStatus.Execution(map));
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
-          + "fetch execId " + execId, e);
-    }
-  }
+  public AzkabanFetchExecuteFlowStatus fetchFlowExecution(String execId) 
throws AzkabanClientException {
+    AzkabanMultiCallables.FetchFlowExecCallable callable =
+        new AzkabanMultiCallables.FetchFlowExecCallable(this, execId);
 
-  private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> 
flowParams) {
-    if (flowParams != null) {
-      for (Map.Entry<String, String> entry : flowParams.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
-          log.debug("New flow parameter added:" + key + "-->" + value);
-          nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
-        }
-      }
-    }
+    return runWithRetry(callable, AzkabanFetchExecuteFlowStatus.class);
   }
 
-  private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> 
flowOptions) {
-    if (flowOptions != null) {
-      for (Map.Entry<String, String> option : flowOptions.entrySet()) {
-        log.debug("New flow option added:" + option .getKey()+ "-->" + 
option.getValue());
-        nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+  private <T> T runWithRetry(Callable callable, Class<T> cls) throws 
AzkabanClientException {
+    try {
+      AzkabanClientStatus status = this.retryer.call(callable);
 
 Review comment:
   I force a retry now. Please review the latest
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 201530)
    Time Spent: 1h 10m  (was: 1h)

> Azkaban client should retry if session gets expired
> ---------------------------------------------------
>
>                 Key: GOBBLIN-683
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-683
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Kuai Yu
>            Assignee: Kuai Yu
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to