This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4353e00  [GOBBLIN-1340] Extend Azkaban client
4353e00 is described below

commit 4353e0054d19e3cc898547656848511a9c700089
Author: aprokofiev <[email protected]>
AuthorDate: Wed Jan 6 12:04:59 2021 -0800

    [GOBBLIN-1340] Extend Azkaban client
    
    [GOBBLIN-1340] Extend Azkaban client
    
    We extend Azkaban client with API calls to check
    project existence,
    and get list of the flows. Log retrieval API is
    refactored to stream
    logs and use strongly-typed parameters.
    
    Azkaban client tests can now run concurrently, and
    should be more stable.
    
    These changes will be used in the end-to-end
    Gobblin testing framework.
    
    Add missing Azkaban configuration key
    
    Address code review comments
    
    Closes #3176 from aplex/azkaban-client-ext
---
 .../gobblin/configuration/ConfigurationKeys.java   |   5 +-
 .../modules/orchestration/AzkabanClient.java       | 150 ++++++++++++++-------
 .../orchestration/AzkabanMultiCallables.java       |  86 +++++++++---
 .../orchestration/AzkabanProjectFlowsStatus.java   |  42 ++++++
 .../modules/orchestration/SessionHelper.java       |  12 +-
 .../modules/orchestration/AzkabanClientTest.java   | 126 ++++++++++-------
 6 files changed, 294 insertions(+), 127 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f93bcdc..39d4543 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -17,11 +17,11 @@
 
 package org.apache.gobblin.configuration;
 
+import com.google.common.base.Charsets;
+
 import java.nio.charset.Charset;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Charsets;
-
 
 /**
  * A central place for all Gobblin configuration property keys.
@@ -928,6 +928,7 @@ public class ConfigurationKeys {
   public static final String AZKABAN_FLOW_URL = "azkaban.link.workflow.url";
   public static final String AZKABAN_JOB_URL = "azkaban.link.job.url";
   public static final String AZKABAN_JOB_EXEC_URL = "azkaban.link.jobexec.url";
+  public static final String AZKABAN_WEBSERVERHOST = "azkaban.webserverhost";
 
   /**
    * Hive registration properties
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index 5f88fa9..c8e5380 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -17,9 +17,23 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.io.Closer;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,8 +42,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
+import lombok.Builder;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
@@ -46,25 +61,6 @@ import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.rholder.retry.AttemptTimeLimiters;
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.SimpleTimeLimiter;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
-import lombok.Builder;
-
-import org.apache.gobblin.util.ExecutorsUtils;
-
 
 /**
  * A simple http based client that uses Ajax API to communicate with Azkaban 
server.
@@ -86,6 +82,7 @@ public class AzkabanClient implements Closeable {
   private Closer closer = Closer.create();
   private Retryer<AzkabanClientStatus> retryer;
   private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
+  private Duration requestTimeout;
 
   /**
    * Child class should have a different builderMethodName.
@@ -97,7 +94,8 @@ public class AzkabanClient implements Closeable {
                           long sessionExpireInMin,
                           CloseableHttpClient httpClient,
                           SessionManager sessionManager,
-                          ExecutorService executorService)
+                          ExecutorService executorService,
+                          Duration requestTimeout)
       throws AzkabanClientException {
     this.username = username;
     this.password = password;
@@ -106,12 +104,16 @@ public class AzkabanClient implements Closeable {
     this.httpClient = httpClient;
     this.sessionManager = sessionManager;
     this.executorService = executorService;
+    this.requestTimeout = ObjectUtils.defaultIfNull(requestTimeout, 
Duration.ofSeconds(10));
+
     this.initializeClient();
     this.initializeSessionManager();
     this.intializeExecutorService();
+
     this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
         .retryIfExceptionOfType(InvalidSessionException.class)
-        .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, 
TimeUnit.SECONDS, this.executorService))
+        
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.requestTimeout.toMillis(),
 TimeUnit.MILLISECONDS,
+            this.executorService))
         .withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
         .withStopStrategy(StopStrategies.stopAfterAttempt(3))
         .build();
@@ -157,9 +159,9 @@ public class AzkabanClient implements Closeable {
 
     HttpClientBuilder builder = HttpClientBuilder.create();
     RequestConfig requestConfig = RequestConfig.copy(RequestConfig.DEFAULT)
-          .setSocketTimeout(10000)
-          .setConnectTimeout(10000)
-          .setConnectionRequestTimeout(10000)
+          .setSocketTimeout((int) this.requestTimeout.toMillis())
+          .setConnectTimeout((int) this.requestTimeout.toMillis())
+          .setConnectionRequestTimeout((int) this.requestTimeout.toMillis())
           .build();
 
       builder.disableCookieManagement()
@@ -184,7 +186,7 @@ public class AzkabanClient implements Closeable {
         .toNanos();
 
     if (expired) {
-      log.info("Session expired. Generating a new session.");
+      log.debug("Session expired. Generating a new session.");
     } else if (forceRefresh) {
       log.info("Force to refresh session. Generating a new session.");
     }
@@ -204,20 +206,27 @@ public class AzkabanClient implements Closeable {
    * @return A map composed by the first level of KV pair of json object
    */
   protected static Map<String, String> handleResponse(HttpResponse response) 
throws IOException {
-    int code = response.getStatusLine().getStatusCode();
-    if (code != HttpStatus.SC_CREATED && code != HttpStatus.SC_OK) {
-      log.error("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
-      throw new AzkabanClientException("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
-    }
+    verifyStatusCode(response);
+    JsonObject json = getResponseJson(response);
+    return getFlatMap(json);
+  }
 
-    // Get response in string
+  protected static <T> T handleResponse(HttpResponse response, Class<T> 
responseClass) throws IOException {
+    verifyStatusCode(response);
+    JsonObject json = getResponseJson(response);
+
+    Gson gson = new Gson();
+    return gson.fromJson(json, responseClass);
+  }
+
+  private static JsonObject getResponseJson(HttpResponse response) throws 
IOException {
     HttpEntity entity = null;
     String jsonResponseString;
 
     try {
       entity = response.getEntity();
       jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8");
-      log.info("Response string: " + jsonResponseString);
+      log.debug("Response string: {}", jsonResponseString);
     } catch (Exception e) {
       throw new AzkabanClientException("Cannot convert response to a string", 
e);
     } finally {
@@ -225,27 +234,37 @@ public class AzkabanClient implements Closeable {
         EntityUtils.consume(entity);
       }
     }
+    return parseResponse(jsonResponseString);
+  }
 
-    return AzkabanClient.parseResponse(jsonResponseString);
+  protected static void verifyStatusCode(HttpResponse response) throws 
AzkabanClientException {
+    int code = response.getStatusLine().getStatusCode();
+    if (code != HttpStatus.SC_CREATED && code != HttpStatus.SC_OK) {
+      log.error("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
+      throw new AzkabanClientException("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
+    }
   }
 
-  static Map<String, String> parseResponse(String jsonResponseString) throws 
IOException {
-    // Parse Json
+  static Map<String, String> getFlatMap(JsonObject jsonObject) {
+    if (jsonObject == null) {
+      return null;
+    }
     Map<String, String> responseMap = new HashMap<>();
-    if (StringUtils.isNotBlank(jsonResponseString)) {
-      JsonObject jsonObject = new 
JsonParser().parse(jsonResponseString).getAsJsonObject();
-
-      // Handle error if any
-      handleResponseError(jsonObject);
-
-      // Get all responseKeys
-      for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
-        responseMap.put(entry.getKey(), 
entry.getValue().toString().replaceAll("\"", ""));
-      }
+    for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+      responseMap.put(entry.getKey(), 
entry.getValue().toString().replaceAll("\"", ""));
     }
     return responseMap;
   }
 
+  static JsonObject parseResponse(String jsonResponseString) throws 
IOException {
+    if (!StringUtils.isNotBlank(jsonResponseString)) {
+      return null;
+    }
+    JsonObject jsonObject = new 
JsonParser().parse(jsonResponseString).getAsJsonObject();
+    handleResponseError(jsonObject);
+    return jsonObject;
+  }
+
   private static void handleResponseError(JsonObject jsonObject) throws 
IOException {
     // Azkaban does not has a standard for error messages tag
     if (null != jsonObject.get(AzkabanClientParams.STATUS) &&
@@ -307,6 +326,22 @@ public class AzkabanClient implements Closeable {
   }
 
   /**
+   * Checks if the project with specified name exists in Azkaban
+   */
+  public Boolean projectExists(String projectName) throws 
AzkabanClientException {
+    try {
+      fetchProjectFlows(projectName);
+      return true;
+    } catch (AzkabanClientException e) {
+      // Azkaban does not return a strongly typed error code, so we are 
checking the message
+      if (e.getCause().getMessage().contains("doesn't exist")) {
+        return false;
+      } else {
+        throw e;
+      }
+    }
+  }
+  /**
    * Updates a project by uploading a new zip file. Before uploading any 
project zip files,
    * the project should be created first.
    *
@@ -387,9 +422,9 @@ public class AzkabanClient implements Closeable {
    */
   public AzkabanClientStatus fetchExecutionLog(String execId,
                                                String jobId,
-                                               String offset,
-                                               String length,
-                                               File ouf) throws 
AzkabanClientException {
+                                               long offset,
+                                               long length,
+                                               OutputStream logStream) throws 
AzkabanClientException {
     AzkabanMultiCallables.FetchExecLogCallable callable =
         AzkabanMultiCallables.FetchExecLogCallable.builder()
             .client(this)
@@ -397,7 +432,7 @@ public class AzkabanClient implements Closeable {
             .jobId(jobId)
             .offset(offset)
             .length(length)
-            .output(ouf)
+            .output(logStream)
             .build();
 
     return runWithRetry(callable, AzkabanClientStatus.class);
@@ -423,6 +458,21 @@ public class AzkabanClient implements Closeable {
   }
 
   /**
+   * Returns a list of flow ids in a specified project.
+   *
+   * @param projectName name of the project.
+   */
+  public AzkabanProjectFlowsStatus fetchProjectFlows(String projectName) 
throws AzkabanClientException {
+    AzkabanMultiCallables.FetchProjectFlowsCallable callable =
+            AzkabanMultiCallables.FetchProjectFlowsCallable.builder()
+                    .client(this)
+                    .projectName(projectName)
+                    .build();
+
+    return runWithRetry(callable, AzkabanProjectFlowsStatus.class);
+  }
+
+  /**
    * Given a project and user, add that user as a proxy user in the project.
    *
    * @param projectName project name
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
index 30981aa..344d3b2 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
@@ -17,13 +17,8 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.io.FileUtils;
+import com.google.common.io.Closer;
+import lombok.Builder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
@@ -39,14 +34,15 @@ import org.apache.http.entity.mime.MultipartEntityBuilder;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Closer;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
+import java.io.File;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
 
 
 /**
@@ -130,7 +126,8 @@ class AzkabanMultiCallables {
 
         CloseableHttpResponse response = client.httpClient.execute(httpGet);
         closer.register(response);
-        AzkabanClient.handleResponse(response);
+        AzkabanClient.verifyStatusCode(response);
+
         return new AzkabanSuccess();
       } catch (InvalidSessionException e) {
         this.invalidSession = true;
@@ -329,6 +326,47 @@ class AzkabanMultiCallables {
     }
   }
 
+
+  /**
+   * A callable that will fetch a flow status on Azkaban.
+   */
+  @Builder
+  static class FetchProjectFlowsCallable implements 
Callable<AzkabanProjectFlowsStatus> {
+    private AzkabanClient client;
+    private boolean invalidSession = false;
+    private String projectName;
+
+    @Override
+    public AzkabanProjectFlowsStatus call()
+            throws AzkabanClientException {
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, 
"fetchprojectflows"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, 
projectName));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
client.sessionId));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
+
+        HttpGet httpGet = new HttpGet(client.url + "/manager?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
+        httpGet.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpGet);
+        closer.register(response);
+
+        AzkabanProjectFlowsStatus.Project project =
+                AzkabanClient.handleResponse(response, 
AzkabanProjectFlowsStatus.Project.class);
+        return new AzkabanProjectFlowsStatus(project);
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Exception e) {
+        throw new AzkabanClientException("Azkaban client cannot fetch project 
flows", e);
+      }
+    }
+  }
+
   /**
    * A callable that will fetch a flow log on Azkaban.
    */
@@ -337,9 +375,9 @@ class AzkabanMultiCallables {
     private AzkabanClient client;
     private String execId;
     private String jobId;
-    private String offset;
-    private String length;
-    private File output;
+    private long offset;
+    private long length;
+    private OutputStream output;
     private boolean invalidSession = false;
 
     @Override
@@ -352,8 +390,8 @@ class AzkabanMultiCallables {
         nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
client.sessionId));
         nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
         nvps.add(new BasicNameValuePair(AzkabanClientParams.JOBID, jobId));
-        nvps.add(new BasicNameValuePair(AzkabanClientParams.OFFSET, offset));
-        nvps.add(new BasicNameValuePair(AzkabanClientParams.LENGTH, length));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.OFFSET, 
String.valueOf(offset)));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.LENGTH, 
String.valueOf(length)));
 
         Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
         Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
@@ -364,7 +402,11 @@ class AzkabanMultiCallables {
         CloseableHttpResponse response = client.httpClient.execute(httpGet);
         closer.register(response);
         Map<String, String> map = AzkabanClient.handleResponse(response);
-        FileUtils.writeStringToFile(output, map.get(AzkabanClientParams.DATA), 
Charsets.UTF_8);
+
+        try (Writer logWriter = new OutputStreamWriter(output, 
StandardCharsets.UTF_8)) {
+          logWriter.write(map.get(AzkabanClientParams.DATA));
+        }
+
         return new AzkabanSuccess();
       } catch (InvalidSessionException e) {
         this.invalidSession = true;
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectFlowsStatus.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectFlowsStatus.java
new file mode 100644
index 0000000..c79464e
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectFlowsStatus.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+public class AzkabanProjectFlowsStatus extends 
AzkabanClientStatus<AzkabanProjectFlowsStatus.Project> {
+    public AzkabanProjectFlowsStatus(AzkabanProjectFlowsStatus.Project 
project) {
+        super(project);
+    }
+
+    // Those classes represent Azkaban API response
+    // For more details, see: 
https://azkaban.readthedocs.io/en/latest/ajaxApi.html#fetch-flows-of-a-project
+    @Getter
+    @AllArgsConstructor
+    public static class Project {
+        long projectId;
+        List<Flow> flows;
+    }
+
+    @Getter
+    @AllArgsConstructor
+    public static class Flow {
+        String flowId;
+    }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
index f491ad9..64133fe 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
@@ -17,9 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.gson.JsonObject;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.NameValuePair;
@@ -30,6 +28,10 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * A helper class which can get session id using Azkaban authentication 
mechanism.
  *
@@ -69,7 +71,9 @@ public class SessionHelper {
 
         // retrieve session id from entity
         String jsonResponseString = IOUtils.toString(entity.getContent(), 
"UTF-8");
-        String sessionId = 
AzkabanClient.parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID);
+        JsonObject jsonObject = 
AzkabanClient.parseResponse(jsonResponseString);
+        Map<String, String> responseMap = AzkabanClient.getFlatMap(jsonObject);
+        String sessionId = responseMap.get(AzkabanClientParams.SESSION_ID);
         EntityUtils.consume(entity);
         return sessionId;
       } catch (Exception e) {
diff --git 
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
index e648830..1c7fbd3 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
@@ -17,29 +17,30 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import lombok.extern.slf4j.Slf4j;
-
 
 
 /**
@@ -48,9 +49,14 @@ import lombok.extern.slf4j.Slf4j;
  * Please check https://azkaban.github.io/azkaban/docs/latest/ for how to 
setup Azkaban-solo-server.
  */
 @Slf4j
+@Test(enabled = false)
 public class AzkabanClientTest {
   private AzkabanClient client = null;
   private long sessionExpireInMin = 1;
+
+  String projectName;
+  String description;
+
   @BeforeClass
   public void setup() throws Exception {
     Config azkConfig = ConfigFactory.load("local-azkaban-service.conf");
@@ -65,55 +71,74 @@ public class AzkabanClientTest {
         .build();
   }
 
+  @BeforeMethod
+  public void testSetup() {
+    projectName = "test-project-" + System.currentTimeMillis() + "-" + 
UUID.randomUUID().toString().substring(0, 4);
+    description = "This is test project.";
+  }
+
+  @AfterMethod
+  public void testCleanup() throws AzkabanClientException {
+    this.client.deleteProject(projectName);
+  }
+
   @AfterClass
   public void cleanup() throws IOException {
     this.client.close();
   }
 
   private void ensureProjectExist(String projectName, String description) 
throws AzkabanClientException {
-    // make sure it is in a clean state
-    this.client.deleteProject(projectName);
-
-    // make sure the project is created successfully
     this.client.createProject(projectName, description);
   }
 
-  @Test(enabled = false)
-  public void testFetchLog() throws AzkabanClientException {
-    String execId = "11211956";
-    String jobId = "tracking-hourly-bucket1";
-
-    // fetch log
-    this.client.fetchExecutionLog(execId, jobId, "0", "100000000", new 
File("/tmp/sample.log"));
-  }
+  public void testFetchLog() throws Exception {
+    String flowName = "test-exec-flow";
+    String jobId = "test-exec-flow";
 
+    ensureProjectExist(projectName, description);
+    File zipFile = createAzkabanZip(flowName);
+    this.client.uploadProjectZip(projectName, zipFile);
 
-  @Test(enabled = false)
-  public void testCreateProject() throws AzkabanClientException {
-    String projectName = "project-create";
-    String description = "This is a create project test.";
+    AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, 
flowName, Maps.newHashMap());
+    String execId = execStatus.getResponse().getExecId();
+
+    ByteArrayOutputStream logStream = null;
+
+    // Logs are not instantly available. Retrying several times until the job 
has started, and logs are present.
+    int maxTries = 10;
+    for (int i = 0; i < maxTries; i++) {
+      logStream = new ByteArrayOutputStream();
+
+      Thread.sleep(1000);
+      try {
+        this.client.fetchExecutionLog(execId, jobId, 0, 100000000, logStream);
+        break;
+      } catch (Exception ex) {
+        if (i == maxTries - 1) {
+          throw ex;
+        }
+      }
+    }
 
-    ensureProjectExist(projectName, description);
+    Assert.assertTrue(logStream.size() > 0);
+  }
 
-    // the second time creation should fail
+  public void testProjectCreateAndDelete() throws AzkabanClientException {
     this.client.createProject(projectName, description);
+    this.client.deleteProject(projectName);
   }
 
-  @Test(enabled = false)
-  public void testDeleteProject() throws AzkabanClientException {
-    String projectName = "project-delete";
-    String description = "This is a delete project test.";
+  public void testProjectExistenceCheck() throws AzkabanClientException {
+    Assert.assertFalse(this.client.projectExists(projectName));
 
-    ensureProjectExist(projectName, description);
+    this.client.createProject(projectName, description);
+    Assert.assertTrue(this.client.projectExists(projectName));
 
-    // delete the new project
     this.client.deleteProject(projectName);
+    Assert.assertFalse(this.client.projectExists(projectName));
   }
 
-  @Test(enabled = false)
   public void testUploadZip() throws IOException {
-    String projectName = "project-upload";
-    String description = "This is a upload project test.";
     String flowName = "test-upload";
 
     ensureProjectExist(projectName, description);
@@ -131,10 +156,7 @@ public class AzkabanClientTest {
     }
   }
 
-  @Test(enabled = false)
   public void testExecuteFlow() throws IOException {
-    String projectName = "project-execFlow";
-    String description = "This is a flow execution test.";
     String flowName = "test-exec-flow";
 
     ensureProjectExist(projectName, description);
@@ -148,10 +170,7 @@ public class AzkabanClientTest {
     log.info("Execid: {}", execStatus.getResponse().execId);
   }
 
-  @Test(enabled = false)
   public void testExecuteFlowWithParams() throws IOException {
-    String projectName = "project-execFlow-Param";
-    String description = "This is a flow execution test.";
     String flowName = "test-exec-flow-param";
 
     ensureProjectExist(projectName, description);
@@ -169,10 +188,7 @@ public class AzkabanClientTest {
     log.info("Execid: {}", execStatus.getResponse().execId);
   }
 
-  @Test(enabled = false)
   public void testExecuteFlowWithOptions() throws IOException {
-    String projectName = "project-execFlow-Option";
-    String description = "This is a flow execution test.";
     String flowName = "test-exec-flow-options";
 
     ensureProjectExist(projectName, description);
@@ -188,10 +204,7 @@ public class AzkabanClientTest {
     log.info("Execid: {}", execStatus.getResponse().execId);
   }
 
-  @Test(enabled = false)
   public void testFetchFlowExecution() throws Exception {
-    String projectName = "project-fetch-flow-exec";
-    String description = "This is a flow execution fetch test.";
     String flowName = "test-fetch-flow-executions";
 
     ensureProjectExist(projectName, description);
@@ -218,12 +231,27 @@ public class AzkabanClientTest {
 
   @Test(enabled = false)
   public void testSessionExpiration() throws Exception {
-    String projectName = "project-session-expiration-test";
-    String description = "This is a session expiration test.";
     Thread.sleep(sessionExpireInMin * 60 * 1000);
     ensureProjectExist(projectName, description);
   }
 
+  public void testGettingProjectFlows() throws IOException {
+    String flowName = "test-exec-flow";
+
+    ensureProjectExist(projectName, description);
+
+    AzkabanProjectFlowsStatus status = 
this.client.fetchProjectFlows(projectName);
+    Assert.assertTrue(status.getResponse().getFlows().isEmpty());
+
+    File zipFile = createAzkabanZip(flowName);
+    this.client.uploadProjectZip(projectName, zipFile);
+
+    status = this.client.fetchProjectFlows(projectName);
+    List<AzkabanProjectFlowsStatus.Flow> flows = 
status.getResponse().getFlows();
+    Assert.assertEquals(1, flows.size());
+    Assert.assertEquals(flowName, flows.get(0).flowId);
+  }
+
   private File createAzkabanZip(String flowName) throws IOException {
     Properties jobProps = new Properties();
     jobProps.load(this.getClass().getClassLoader().

Reply via email to