Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 27a54f05e -> 78da23b11


[GOBBLIN-572] Add AzkabanClient

Closes #2437 from yukuai518/azkcli


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/78da23b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/78da23b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/78da23b1

Branch: refs/heads/master
Commit: 78da23b11d2e6a8e05e5edbf33db0e425589fbbc
Parents: 27a54f0
Author: Kuai Yu <[email protected]>
Authored: Tue Aug 28 15:36:37 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Aug 28 15:36:37 2018 -0700

----------------------------------------------------------------------
 .../orchestration/AzkabanAjaxAPIClient.java     |  66 +--
 .../modules/orchestration/AzkabanClient.java    | 475 +++++++++++++++++++
 .../orchestration/AzkabanClientException.java   |  35 ++
 .../orchestration/AzkabanClientParams.java      |  43 ++
 .../orchestration/AzkabanClientStatus.java      |  60 +++
 .../orchestration/AzkabanExecuteFlowStatus.java |  42 ++
 .../AzkabanFetchExecuteFlowStatus.java          |  45 ++
 .../orchestration/AzkabanClientTest.java        | 287 +++++++++++
 .../resources/azkakaban-job-basic.properties    |   7 +
 .../test/resources/local-azkaban-service.conf   |  25 +
 10 files changed, 1029 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index f54d6a5..3494c5c 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -18,23 +18,17 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.commons.codec.EncoderException;
 import org.apache.commons.codec.net.URLCodec;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
@@ -54,12 +48,16 @@ import org.apache.http.ssl.TrustStrategy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Maps;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 
 
 @Slf4j
+@Deprecated
+/**
+ * This format of azkaban client is obsolete. Please use {@link AzkabanClient} 
as the new alternative.
+ */
 public class AzkabanAjaxAPIClient {
   private static Splitter SPLIT_ON_COMMA = 
Splitter.on(",").omitEmptyStrings().trimResults();
 
@@ -367,7 +365,7 @@ public class AzkabanAjaxAPIClient {
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
     HttpResponse response = httpClient.execute(getRequest);
-    return handleResponse(response);
+    return AzkabanClient.handleResponse(response);
   }
 
   @VisibleForTesting
@@ -375,7 +373,7 @@ public class AzkabanAjaxAPIClient {
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
     HttpResponse response = httpClient.execute(postRequest);
-    return handleResponse(response);
+    return AzkabanClient.handleResponse(response);
   }
 
   private static String uploadZipFileToAzkaban(String sessionId, String 
azkabanServerUrl, String azkabanProjectName,
@@ -399,7 +397,7 @@ public class AzkabanAjaxAPIClient {
     HttpResponse response = httpClient.execute(postRequest);
 
     // Obtaining projectId is hard. Uploading zip file is one avenue to get it 
from Azkaban
-    return handleResponse(response, "projectId").get("projectId");
+    return AzkabanClient.handleResponse(response).get("projectId");
   }
 
   private static CloseableHttpClient getHttpClient()
@@ -417,50 +415,6 @@ public class AzkabanAjaxAPIClient {
     }
   }
 
-  private static Map<String, String> handleResponse(HttpResponse response, 
String... responseKeys)
-      throws IOException {
-    if (response.getStatusLine().getStatusCode() != 201 && 
response.getStatusLine().getStatusCode()!= 200) {
-      log.error("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
-      throw new RuntimeException("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
-    }
-
-    // Get response in string
-    InputStream in = response.getEntity().getContent();
-    String jsonResponseString = IOUtils.toString(in, "UTF-8");
-    log.info("Response string: " + jsonResponseString);
-
-    // Parse Json
-    Map<String, String> responseMap = new HashMap<>();
-    if (StringUtils.isNotBlank(jsonResponseString)) {
-      JsonObject jsonObject = new 
JsonParser().parse(jsonResponseString).getAsJsonObject();
-
-      // Handle error if any
-      handleResponseError(jsonObject);
-
-      // Get all responseKeys
-      for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
-        responseMap.put(entry.getKey(), 
entry.getValue().toString().replaceAll("\"", ""));
-      }
-    }
-
-    return responseMap;
-  }
-
-  private static void handleResponseError(JsonObject jsonObject) throws 
IOException {
-    // Azkaban does not has a standard for error messages tag
-    if (null != jsonObject.get("status") && 
"error".equalsIgnoreCase(jsonObject.get("status").toString()
-        .replaceAll("\"", ""))) {
-      String message = (null != jsonObject.get("message")) ?
-          jsonObject.get("message").toString().replaceAll("\"", "") : "Issue 
in creating project";
-      throw new IOException(message);
-    }
-
-    if (null != jsonObject.get("error")) {
-      String error = jsonObject.get("error").toString().replaceAll("\"", "");
-      throw new IOException(error);
-    }
-  }
-
   /***
    * Generate a random scheduled time between specified execution time window 
in the Azkaban compatible format
    * which is: hh,mm,a,z Eg. ScheduleTime=12,00,PM,PDT

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
new file mode 100644
index 0000000..75d088d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import lombok.Builder;
+
+
+/**
+ * A simple client that uses Ajax API to communicate with Azkaban server.
+ *
+ * Lombok will not consider fields from the superclass in the generated 
builder class. For a workaround, we put
+ * @Builder in constructors to allow Builder inheritance.
+ *
+ * @see {@linktourl 
https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/}
+ * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api}
+ */
+public class AzkabanClient implements Closeable {
+  protected final String username;
+  protected final String password;
+  protected final String url;
+  protected final long sessionExpireInMin; // default value is 12h.
+
+  protected String sessionId;
+  protected long sessionCreationTime = 0;
+  protected CloseableHttpClient client;
+  private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
+
+  /**
+   * Child class should have a different builderMethodName.
+   */
+  @Builder
+  protected AzkabanClient(String username,
+                          String password,
+                          String url,
+                          long sessionExpireInMin)
+      throws AzkabanClientException {
+    this.username = username;
+    this.password = password;
+    this.url = url;
+    this.sessionExpireInMin = sessionExpireInMin;
+    this.client = getClient();
+    this.initializeSession();
+  }
+
+  /**
+   * Create a session id that can be used in the future to communicate with 
Azkaban server.
+   */
+  protected void initializeSession() throws AzkabanClientException {
+    try {
+      HttpPost httpPost = new HttpPost(this.url);
+      List<NameValuePair> nvps = new ArrayList<>();
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login"));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, 
this.username));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, 
this.password));
+      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+      CloseableHttpResponse response = this.client.execute(httpPost);
+
+      try {
+        HttpEntity entity = response.getEntity();
+
+        // retrieve session id from entity
+        String jsonResponseString = IOUtils.toString(entity.getContent(), 
"UTF-8");
+        this.sessionId = 
parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID);
+        EntityUtils.consume(entity);
+      } finally {
+        response.close();
+      }
+      this.sessionCreationTime = System.nanoTime();
+    } catch (Exception e) {
+      throw new AzkabanClientException("Azkaban client cannot initialize 
session.", e);
+    }
+  }
+
+  /**
+   * Create a {@link CloseableHttpClient} used to communicate with Azkaban 
server.
+   * Derived class can configure different http client by overriding this 
method.
+   *
+   * @return A closeable http client.
+   */
+  protected CloseableHttpClient getClient() throws AzkabanClientException {
+    try {
+    // SSLSocketFactory using custom TrustStrategy that ignores warnings about 
untrusted certificates
+    // Self sign SSL
+    SSLContextBuilder sslcb = new SSLContextBuilder();
+    sslcb.loadTrustMaterial(null, (TrustStrategy) new 
TrustSelfSignedStrategy());
+    SSLConnectionSocketFactory sslsf = new 
SSLConnectionSocketFactory(sslcb.build());
+
+    HttpClientBuilder builder = HttpClientBuilder.create();
+    RequestConfig requestConfig = RequestConfig.copy(RequestConfig.DEFAULT)
+          .setSocketTimeout(10000)
+          .setConnectTimeout(10000)
+          .setConnectionRequestTimeout(10000)
+          .build();
+
+      builder.disableCookieManagement()
+          .useSystemProperties()
+          .setDefaultRequestConfig(requestConfig)
+          .setConnectionManager(new BasicHttpClientConnectionManager())
+          .setSSLSocketFactory(sslsf);
+
+      return builder.build();
+    } catch (Exception e) {
+      throw new AzkabanClientException("HttpClient cannot be created", e);
+    }
+  }
+
+  private void refreshSession() throws AzkabanClientException {
+    Preconditions.checkArgument(this.sessionCreationTime != 0);
+    if ((System.nanoTime() - this.sessionCreationTime) > 
Duration.ofMinutes(this.sessionExpireInMin).toNanos()) {
+      log.info("Session expired. Generating a new session.");
+      this.initializeSession();
+    }
+  }
+
+  /**
+   * Convert a {@link HttpResponse} to a <string, string> map.
+   * Put protected modifier here so it is visible to {@link 
AzkabanAjaxAPIClient}.
+   *
+   * @param response An http response returned by {@link 
org.apache.http.client.HttpClient} execution.
+   *                 This should be JSON string.
+   * @return A map composed by the first level of KV pair of json object
+   */
+  protected static Map<String, String> handleResponse(HttpResponse response) 
throws IOException {
+    int code = response.getStatusLine().getStatusCode();
+    if (code != HttpStatus.SC_CREATED && code != HttpStatus.SC_OK) {
+      log.error("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
+      throw new AzkabanClientException("Failed : HTTP error code : " + 
response.getStatusLine().getStatusCode());
+    }
+
+    // Get response in string
+    HttpEntity entity = null;
+    String jsonResponseString;
+
+    try {
+      entity = response.getEntity();
+      jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8");
+      log.info("Response string: " + jsonResponseString);
+    } catch (Exception e) {
+      throw new AzkabanClientException("Cannot convert response to a string", 
e);
+    } finally {
+      if (entity != null) {
+        EntityUtils.consume(entity);
+      }
+    }
+
+    return AzkabanClient.parseResponse(jsonResponseString);
+  }
+
+  private static Map<String, String> parseResponse(String jsonResponseString) 
throws IOException {
+    // Parse Json
+    Map<String, String> responseMap = new HashMap<>();
+    if (StringUtils.isNotBlank(jsonResponseString)) {
+      JsonObject jsonObject = new 
JsonParser().parse(jsonResponseString).getAsJsonObject();
+
+      // Handle error if any
+      handleResponseError(jsonObject);
+
+      // Get all responseKeys
+      for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+        responseMap.put(entry.getKey(), 
entry.getValue().toString().replaceAll("\"", ""));
+      }
+    }
+    return responseMap;
+  }
+
+  private static void handleResponseError(JsonObject jsonObject) throws 
IOException {
+    // Azkaban does not has a standard for error messages tag
+    if (null != jsonObject.get(AzkabanClientParams.STATUS) &&
+        
AzkabanClientParams.ERROR.equalsIgnoreCase(jsonObject.get(AzkabanClientParams.STATUS).toString()
+        .replaceAll("\"", ""))) {
+      String message = (null != jsonObject.get(AzkabanClientParams.MESSAGE)) ? 
jsonObject.get(AzkabanClientParams.MESSAGE).toString()
+          .replaceAll("\"", "") : "Unknown issue";
+      throw new IOException(message);
+    }
+
+    if (null != jsonObject.get(AzkabanClientParams.ERROR)) {
+      String error = 
jsonObject.get(AzkabanClientParams.ERROR).toString().replaceAll("\"", "");
+      throw new AzkabanClientException(error);
+    }
+  }
+
+  /**
+   * Creates a project.
+   *
+   * @param projectName project name
+   * @param description project description
+   *
+   * @return A status object indicating if AJAX request is successful.
+   */
+  public AzkabanClientStatus createProject(
+      String projectName,
+      String description) {
+    try {
+      refreshSession();
+      HttpPost httpPost = new HttpPost(this.url + "/manager");
+      List<NameValuePair> nvps = new ArrayList<>();
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "create"));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, projectName));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.DESCRIPTION, 
description));
+      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+
+      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
+      Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
+      httpPost.setHeaders(new Header[]{contentType, requestType});
+
+      CloseableHttpResponse response = this.client.execute(httpPost);
+
+      try {
+        handleResponse(response);
+        return new AzkabanClientStatus.SUCCESS();
+      } finally {
+        response.close();
+      }
+    } catch (Exception e) {
+      return new AzkabanClientStatus.FAIL("Azkaban client cannot create 
project.", e);
+    }
+  }
+
+  /**
+   * Deletes a project. Currently no response message will be returned after 
finishing
+   * the delete operation. Thus success status is always expected.
+   *
+   * @param projectName project name
+   *
+   * @return A status object indicating if AJAX request is successful.
+   */
+  public AzkabanClientStatus deleteProject(String projectName) {
+    try {
+      refreshSession();
+      List<NameValuePair> nvps = new ArrayList<>();
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.DELETE, "true"));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, 
projectName));
+
+      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
+      Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
+
+      HttpGet httpGet = new HttpGet(url + "/manager?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
+      httpGet.setHeaders(new Header[]{contentType, requestType});
+
+      CloseableHttpResponse response = this.client.execute(httpGet);
+      response.close();
+      return new AzkabanClientStatus.SUCCESS();
+
+    } catch (Exception e) {
+      return new AzkabanClientStatus.FAIL("Azkaban client cannot delete 
project = "
+          + projectName, e);
+    }
+  }
+
+  /**
+   * Updates a project by uploading a new zip file. Before uploading any 
project zip files,
+   * the project should be created first.
+   *
+   * @param projectName project name
+   * @param zipFile  zip file
+   *
+   * @return A status object indicating if AJAX request is successful.
+   */
+  public AzkabanClientStatus uploadProjectZip(
+      String projectName,
+      File zipFile) {
+    try {
+      refreshSession();
+      HttpPost httpPost = new HttpPost(this.url + "/manager");
+      HttpEntity entity = MultipartEntityBuilder.create()
+          .addTextBody(AzkabanClientParams.SESSION_ID, sessionId)
+          .addTextBody(AzkabanClientParams.AJAX, "upload")
+          .addTextBody(AzkabanClientParams.PROJECT, projectName)
+          .addBinaryBody("file", zipFile,
+              ContentType.create("application/zip"), zipFile.getName())
+          .build();
+      httpPost.setEntity(entity);
+
+      CloseableHttpResponse response = this.client.execute(httpPost);
+
+      try {
+        handleResponse(response);
+        return new AzkabanClientStatus.SUCCESS();
+      } finally {
+        response.close();
+      }
+    } catch (Exception e) {
+      return new AzkabanClientStatus.FAIL("Azkaban client cannot upload zip to 
project = "
+          + projectName, e);
+    }
+  }
+
+  /**
+   * Execute a flow by providing flow parameters and options. The project and 
flow should be created first.
+   *
+   * @param projectName project name
+   * @param flowName  flow name
+   * @param flowOptions  flow options
+   * @param flowParameters  flow parameters
+   *
+   * @return The status object which contains success status and execution id.
+   */
+  public AzkabanExecuteFlowStatus executeFlowWithOptions(
+      String projectName,
+      String flowName,
+      Map<String, String> flowOptions,
+      Map<String, String> flowParameters) {
+
+    try {
+      refreshSession();
+      HttpPost httpPost = new HttpPost(this.url + "/executor");
+      List<NameValuePair> nvps = new ArrayList<>();
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, 
"executeFlow"));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, 
projectName));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.FLOW, flowName));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.CONCURRENT_OPTION, 
"ignore"));
+
+      addFlowOptions(nvps, flowOptions);
+      addFlowParameters(nvps, flowParameters);
+
+      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+
+      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
+      Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
+      httpPost.setHeaders(new Header[]{contentType, requestType});
+
+      CloseableHttpResponse response = this.client.execute(httpPost);
+
+      try {
+        Map<String, String> map = handleResponse(response);
+        return new AzkabanExecuteFlowStatus(
+            new 
AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID)));
+      } finally {
+        response.close();
+      }
+    } catch (Exception e) {
+      return new AzkabanExecuteFlowStatus("Azkaban client cannot execute flow 
= "
+          + flowName, e);
+    }
+  }
+
+  /**
+   * Execute a flow with flow parameters. The project and flow should be 
created first.
+   *
+   * @param projectName project name
+   * @param flowName  flow name
+   * @param flowParameters  flow parameters
+   *
+   * @return The status object which contains success status and execution id.
+   */
+  public AzkabanExecuteFlowStatus executeFlow(
+      String projectName,
+      String flowName,
+      Map<String, String> flowParameters) {
+    return executeFlowWithOptions(projectName, flowName, null, flowParameters);
+  }
+
+  /**
+   * Given an execution id, fetches all the detailed information of that 
execution, including a list of all the job executions.
+   *
+   * @param execId execution id to be fetched.
+   *
+   * @return The status object which contains success status and all the 
detailed information of that execution.
+   */
+  public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
+    try {
+      refreshSession();
+      List<NameValuePair> nvps = new ArrayList<>();
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, 
"fetchexecflow"));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
+
+      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
+      Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
+
+      HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
+      httpGet.setHeaders(new Header[]{contentType, requestType});
+
+      CloseableHttpResponse response = this.client.execute(httpGet);
+      try {
+        Map<String, String> map = handleResponse(response);
+        return new AzkabanFetchExecuteFlowStatus(new 
AzkabanFetchExecuteFlowStatus.Execution(map));
+      } finally {
+        response.close();
+      }
+    } catch (Exception e) {
+      return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
+          + "fetch execId " + execId, e);
+    }
+  }
+
+  private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> 
flowParams) {
+    if (flowParams != null) {
+      for (Map.Entry<String, String> entry : flowParams.entrySet()) {
+        String key = entry.getKey();
+        String value = entry.getValue();
+        if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+          log.debug("New flow parameter added:" + key + "-->" + value);
+          nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
+        }
+      }
+    }
+  }
+
+  private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> 
flowOptions) {
+    if (flowOptions != null) {
+      for (Map.Entry<String, String> option : flowOptions.entrySet()) {
+        log.debug("New flow option added:" + option .getKey()+ "-->" + 
option.getValue());
+        nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+      }
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.client.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
new file mode 100644
index 0000000..da94c03
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+/**
+ * Exception raised by {@link AzkabanClient}.
+ */
+public class AzkabanClientException extends IOException {
+  private static final long serialVersionUID = 11324144L;
+
+  public AzkabanClientException(String message, Exception e) {
+    super(message, e);
+  }
+
+  public AzkabanClientException(String message) {
+    super(message);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
new file mode 100644
index 0000000..27bb4f2
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+/**
+ * A collection of attributes used by {@link AzkabanClient} to form an HTTP 
request,
+ * and parse the HTTP response. More details can be found at
+ * {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api}
+ */
+public class AzkabanClientParams {
+  public static final String ACTION = "action";
+  public static final String USERNAME = "username";
+  public static final String PASSWORD = "password";
+  public static final String SESSION_ID = "session.id";
+
+  public static final String NAME = "name";
+  public static final String DELETE = "delete";
+  public static final String DESCRIPTION = "description";
+  public static final String PROJECT = "project";
+  public static final String FLOW = "flow";
+  public static final String AJAX = "ajax";
+  public static final String CONCURRENT_OPTION = "concurrentOption";
+  public static final String MESSAGE = "message";
+  public static final String STATUS = "status";
+  public static final String ERROR = "error";
+  public static final String EXECID = "execid";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
new file mode 100644
index 0000000..132f4ad
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import lombok.Getter;
+
+@Getter
+public abstract class AzkabanClientStatus<RS> {
+  private boolean success = true;
+  private String failMsg = "";
+  private Throwable throwable = null;
+
+  private RS response = null;
+
+  public AzkabanClientStatus() {
+  }
+
+  public AzkabanClientStatus(RS response) {
+    this.response = response;
+  }
+
+  public AzkabanClientStatus(String failMsg, Throwable throwable) {
+    this.success = false;
+    this.failMsg = failMsg;
+    this.throwable = throwable;
+  }
+
+  /**
+   * This status captures basic success.
+   */
+  public static class SUCCESS extends AzkabanClientStatus<Object> {
+    public SUCCESS() {
+      super();
+    }
+  }
+
+  /**
+   * This status captures basic failure (fail message and throwable).
+   */
+  public static class FAIL extends AzkabanClientStatus<Object> {
+    public FAIL(String failMsg, Throwable throwable) {
+      super(failMsg, throwable);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
new file mode 100644
index 0000000..be9af89
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+/**
+ * This status captures execution id returned by {@link 
AzkabanClient#executeFlowWithOptions}
+ */
+public class AzkabanExecuteFlowStatus extends 
AzkabanClientStatus<AzkabanExecuteFlowStatus.ExecuteId> {
+  public AzkabanExecuteFlowStatus(ExecuteId executeId) {
+    super(executeId);
+  }
+
+  public AzkabanExecuteFlowStatus(String failMsg, Throwable throwable) {
+    super(failMsg, throwable);
+  }
+
+  @Getter
+  @AllArgsConstructor
+  public static class ExecuteId {
+    String execId;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
new file mode 100644
index 0000000..1e0afe8
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Map;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+/**
+ * This status captures execution details returned by {@link 
AzkabanClient#fetchFlowExecution(String)}
+ *
+ * The execution details are captured by {@link Execution}
+ */
+public class AzkabanFetchExecuteFlowStatus extends 
AzkabanClientStatus<AzkabanFetchExecuteFlowStatus.Execution> {
+  public AzkabanFetchExecuteFlowStatus(AzkabanFetchExecuteFlowStatus.Execution 
exec) {
+    super(exec);
+  }
+
+  public AzkabanFetchExecuteFlowStatus(String failMsg, Throwable throwable) {
+    super(failMsg, throwable);
+  }
+
+  @Getter
+  @AllArgsConstructor
+  public static class Execution {
+    Map<String, String> map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
new file mode 100644
index 0000000..9e86b25
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+/**
+ * This test is disabled by default because it assumes the Azkaban-solo-server 
is setup on localhost:8081.
+ *
+ * Please check https://azkaban.github.io/azkaban/docs/latest/ for how to 
setup Azkaban-solo-server.
+ */
+@Slf4j
+public class AzkabanClientTest {
+  private AzkabanClient client = null;
+  private FileSystem fs = null;
+  private long sessionExpireInMin = 1;
+  @BeforeClass
+  public void setup() throws Exception {
+    Config azkConfig = ConfigFactory.load("local-azkaban-service.conf");
+    String userName = 
azkConfig.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+    String password = 
azkConfig.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY);
+    String url = 
azkConfig.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+    this.client = AzkabanClient.builder()
+        .username(userName)
+        .password(password)
+        .url(url)
+        .sessionExpireInMin(sessionExpireInMin)
+        .build();
+    String uri = ConfigurationKeys.LOCAL_FS_URI;
+    this.fs = FileSystem.get(URI.create(uri), new Configuration());
+  }
+
+  @AfterClass
+  public void cleanup() throws IOException {
+    this.client.close();
+  }
+
+  private void ensureProjectExist(String projectName, String description) {
+    AzkabanClientStatus status;
+    // make sure it is in a clean state
+    status = this.client.deleteProject(projectName);
+    Assert.assertTrue(status.isSuccess());
+
+    // make sure the project is created successfully
+    status = this.client.createProject(projectName, description);
+    Assert.assertTrue(status.isSuccess());
+  }
+
+  @Test(enabled = false)
+  public void testCreateProject() {
+    String projectName = "project-create";
+    String description = "This is a create project test.";
+    AzkabanClientStatus status;
+
+    ensureProjectExist(projectName, description);
+
+    // the second time creation should fail
+    status = this.client.createProject(projectName, description);
+    Assert.assertFalse(status.isSuccess());
+  }
+
+  @Test(enabled = false)
+  public void testDeleteProject() {
+    String projectName = "project-delete";
+    String description = "This is a delete project test.";
+    AzkabanClientStatus status;
+
+    ensureProjectExist(projectName, description);
+
+    // delete the new project
+    status = this.client.deleteProject(projectName);
+    Assert.assertTrue(status.isSuccess());
+  }
+
+  @Test(enabled = false)
+  public void testUploadZip() throws IOException {
+    String projectName = "project-upload";
+    String description = "This is a upload project test.";
+    String flowName = "test-upload";
+    AzkabanClientStatus status;
+
+    ensureProjectExist(projectName, description);
+
+    // upload Zip to project
+    File zipFile = createAzkabanZip(flowName);
+    status = this.client.uploadProjectZip(projectName, zipFile);
+    Assert.assertTrue(status.isSuccess());
+
+    // upload Zip to an non-existed project
+    status = this.client.uploadProjectZip("Non-existed-project", zipFile);
+    Assert.assertFalse(status.isSuccess());
+  }
+
+  @Test(enabled = false)
+  public void testExecuteFlow() throws IOException {
+    String projectName = "project-execFlow";
+    String description = "This is a flow execution test.";
+    String flowName = "test-exec-flow";
+
+    ensureProjectExist(projectName, description);
+
+    // upload Zip to project
+    File zipFile = createAzkabanZip(flowName);
+    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, 
zipFile);
+    Assert.assertTrue(status.isSuccess());
+
+    // execute a flow
+    AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, 
flowName, Maps.newHashMap());
+    Assert.assertTrue(execStatus.isSuccess());
+    log.info("Execid: {}", execStatus.getResponse().execId);
+  }
+
+  @Test(enabled = false)
+  public void testExecuteFlowWithParams() throws IOException {
+    String projectName = "project-execFlow-Param";
+    String description = "This is a flow execution test.";
+    String flowName = "test-exec-flow-param";
+
+    ensureProjectExist(projectName, description);
+
+    // upload Zip to project
+    File zipFile = createAzkabanZip(flowName);
+    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, 
zipFile);
+    Assert.assertTrue(status.isSuccess());
+
+    Map<String, String> flowParams = Maps.newHashMap();
+    flowParams.put("gobblin.source", "DummySource");
+    flowParams.put("gobblin.dataset.pattern", 
"/data/tracking/MessageActionEvent/hourly/*/*/*/*");
+
+    // execute a flow
+    AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, 
flowName, flowParams);
+    Assert.assertTrue(execStatus.isSuccess());
+    log.info("Execid: {}", execStatus.getResponse().execId);
+  }
+
+  @Test(enabled = false)
+  public void testExecuteFlowWithOptions() throws IOException {
+    String projectName = "project-execFlow-Option";
+    String description = "This is a flow execution test.";
+    String flowName = "test-exec-flow-options";
+
+    ensureProjectExist(projectName, description);
+
+    // upload Zip to project
+    File zipFile = createAzkabanZip(flowName);
+    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, 
zipFile);
+    Assert.assertTrue(status.isSuccess());
+
+    Map<String, String> flowOptions = Maps.newHashMap();
+
+    // execute a flow
+    AzkabanExecuteFlowStatus execStatus = 
this.client.executeFlowWithOptions(projectName, flowName, flowOptions, 
Maps.newHashMap());
+    Assert.assertTrue(execStatus.isSuccess());
+    log.info("Execid: {}", execStatus.getResponse().execId);
+  }
+
+  @Test(enabled = false)
+  public void testFetchFlowExecution() throws Exception {
+    String projectName = "project-fetch-flow-exec";
+    String description = "This is a flow execution fetch test.";
+    String flowName = "test-fetch-flow-executions";
+
+    ensureProjectExist(projectName, description);
+
+    // upload Zip to project
+    File zipFile = createAzkabanZip(flowName);
+    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, 
zipFile);
+    Assert.assertTrue(status.isSuccess());
+
+    Map<String, String> flowOptions = Maps.newHashMap();
+
+    // execute a flow
+    AzkabanExecuteFlowStatus execStatus = 
this.client.executeFlowWithOptions(projectName, flowName, flowOptions, 
Maps.newHashMap());
+    Assert.assertTrue(execStatus.isSuccess());
+    log.info("Execid: {}", execStatus.getResponse().execId);
+
+    // wait for the job started and failed
+    Thread.sleep(3000);
+
+    // job should fail
+    AzkabanFetchExecuteFlowStatus fetchExecuteFlowStatus = 
this.client.fetchFlowExecution(execStatus.getResponse().execId);
+    Assert.assertTrue(fetchExecuteFlowStatus.isSuccess());
+  }
+
+  @Test(enabled = false)
+  public void testSessionExpiration() throws Exception {
+    String projectName = "project-session-expiration-test";
+    String description = "This is a session expiration test.";
+    Thread.sleep(sessionExpireInMin * 60 * 1000);
+    ensureProjectExist(projectName, description);
+  }
+
+  private File createAzkabanZip(String flowName) throws IOException {
+    Properties jobProps = new Properties();
+    jobProps.load(this.getClass().getClassLoader().
+        getResourceAsStream("azkakaban-job-basic.properties"));
+
+    String basePath = "/tmp/testAzkabanZip";
+    this.fs.delete(new Path(basePath), true);
+
+    // create testAzkabanZip/test dir
+    File jobDir = new File(basePath, flowName);
+    Assert.assertTrue(jobDir.mkdirs());
+
+    // create testAzkabanZip/test/test.job
+    File jobFile = new File(jobDir,flowName + ".job");
+    OutputStream jobOut = new FileOutputStream(jobFile);
+    jobProps.store(jobOut, "Writing a test job file.");
+
+    // create testAzkabanZip/test.zip
+    FileOutputStream fos = new FileOutputStream(jobDir.getPath() + ".zip");
+    ZipOutputStream zos = new ZipOutputStream(fos);
+    addDirToZipArchive(zos, jobDir, null);
+    zos.close();
+    fos.close();
+    return new File(jobDir.getPath() + ".zip");
+  }
+
+  private static void addDirToZipArchive(ZipOutputStream zos, File fileToZip, 
String parentDirectoryName) throws IOException {
+    if (fileToZip == null || !fileToZip.exists()) {
+      return;
+    }
+
+    String zipEntryName = fileToZip.getName();
+    if (parentDirectoryName!=null && !parentDirectoryName.isEmpty()) {
+      zipEntryName = parentDirectoryName + "/" + fileToZip.getName();
+    }
+
+    if (fileToZip.isDirectory()) {
+      for (File file : fileToZip.listFiles()) {
+        addDirToZipArchive(zos, file, zipEntryName);
+      }
+    } else {
+      byte[] buffer = new byte[1024];
+      FileInputStream fis = new FileInputStream(fileToZip);
+      zos.putNextEntry(new ZipEntry(zipEntryName));
+      int length;
+      while ((length = fis.read(buffer)) > 0) {
+        zos.write(buffer, 0, length);
+      }
+      zos.closeEntry();
+      fis.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties
 
b/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties
new file mode 100644
index 0000000..4a2ba83
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties
@@ -0,0 +1,7 @@
+type=hadoopJava
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+
+# Azkaban properties
+job.name=Test
+job.group=TestGroup
+job.description=A test job
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf 
b/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf
new file mode 100644
index 0000000..2aa6e6c
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default values
+# These values should generally come from template being used by the Service
+
+# Default config for Azkaban Infrastructure
+# (Current defaults are for default local Azkaban setup)
+gobblin.service.azkaban.username=azkaban
+gobblin.service.azkaban.password=azkaban
+gobblin.service.azkaban.server.url="http://localhost:8081/";
\ No newline at end of file

Reply via email to