Support to execut Azkaban project from Orchestrator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0bb5139c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0bb5139c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0bb5139c Branch: refs/heads/master Commit: 0bb5139c8822ded33295b9eb118b67df1cb9f418 Parents: e285202 Author: Abhishek Tiwari <[email protected]> Authored: Wed Aug 30 03:36:04 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Aug 30 03:36:04 2017 -0700 ---------------------------------------------------------------------- .../orchestration/AzkabanAjaxAPIClient.java | 340 +++++++++++-------- .../modules/orchestration/AzkabanJobHelper.java | 70 +++- .../orchestration/AzkabanProjectConfig.java | 2 +- .../AzkabanSpecExecutorInstanceProducer.java | 59 ++-- gradle/scripts/dependencyDefinitions.gradle | 1 + 5 files changed, 307 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java index 31fc753..d0b8471 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java @@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -37,7 +35,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.EncoderException; import org.apache.commons.codec.net.URLCodec; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -55,13 +52,14 @@ import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.TrustStrategy; import com.google.common.base.Splitter; +import com.google.common.collect.Maps; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @Slf4j public class AzkabanAjaxAPIClient { - private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); // TODO: Ensure GET call urls do not grow too big @@ -71,93 +69,107 @@ public class AzkabanAjaxAPIClient { private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000; private static final URLCodec codec = new URLCodec(); + /*** + * Authenticate a user and obtain a session.id from response. Once a session.id has been obtained, + * until the session expires, this id can be used to do any API requests with a proper permission granted. + * A session expires if user log's out, changes machine, browser or location, if Azkaban is restarted, + * or if the session expires. The default session timeout is 24 hours (one day). User can re-login irrespective + * of wheter the session has expired or not. For the same user, a new session will always override the old one. + * @param username Username. + * @param password Password. + * @param azkabanServerUrl Azkaban Server Url. + * @return Session Id. + * @throws IOException + * @throws EncoderException + */ public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl) throws IOException, EncoderException { // Create post request - HttpPost postRequest = new HttpPost(azkabanServerUrl); - StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login", - username, codec.encode(password))); - input.setContentType("application/x-www-form-urlencoded"); - postRequest.setEntity(input); - postRequest.setHeader("X-Requested-With", "XMLHttpRequest"); + Map<String, String> params = Maps.newHashMap(); + params.put("action", "login"); + params.put("username", username); + params.put("password", codec.encode(password)); - // Make the call, get response - @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(postRequest); - - return handleResponse(response, "session.id").get("session.id"); + return executePostRequest(preparePostRequest(azkabanServerUrl, null, params)).get("session.id"); } + /*** + * Get project.id for a Project Name. + * @param sessionId Session Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @return Project Id. + * @throws IOException + */ public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call // .. because it does not need any additional params other than project name - // Create get request - HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&" - + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId, - azkabanProjectConfig.getAzkabanProjectName())); + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "fetchprojectflows"); + params.put("project", azkabanProjectConfig.getAzkabanProjectName()); - // Make the call, get response - @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(getRequest); - return handleResponse(response, "projectId").get("projectId"); + return executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager", + sessionId, params)).get("projectId"); } + /*** + * Creates an Azkaban project and uploads the zip file. If proxy user and group permissions are specified in + * Azkaban Project Config, then this method also adds it to the project configuration. + * @param sessionId Session Id. + * @param zipFilePath Zip file to upload. + * @param azkabanProjectConfig Azkaban Project Config. + * @return Project Id. + * @throws IOException + */ public static String createAzkabanProject(String sessionId, String zipFilePath, AzkabanProjectConfig azkabanProjectConfig) throws IOException { + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "executeFlow"); + params.put("name", azkabanProjectConfig.getAzkabanProjectName()); + params.put("description", azkabanProjectConfig.getAzkabanProjectDescription()); - String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl(); - String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName(); - String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription(); - String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers(); - - // Create post request - HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create"); - StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId, - azkabanProjectName, azkabanProjectDescription)); - input.setContentType("application/x-www-form-urlencoded"); - postRequest.setEntity(input); - postRequest.setHeader("X-Requested-With", "XMLHttpRequest"); - - // Make the call, get response - @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(postRequest); - handleResponse(response); + executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + + "/manager?action=create", sessionId, params)); // Add proxy user if any if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) { Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get()); for (String user : proxyUsers) { - addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user); + addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), user); } } // Add group permissions if any // TODO: Support users (not just groups), and different permission types // (though we can add users, we only support groups at the moment and award them with admin permissions) - if (StringUtils.isNotBlank(groupAdminUsers)) { - String [] groups = StringUtils.split(groupAdminUsers, ","); + if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) { + String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ","); for (String group : groups) { - addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, - false, false); + addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), + group, true, true, false, false,false, + false); } } // Upload zip file to azkaban and return projectId - return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath); + return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), zipFilePath); } + /*** + * Replace an existing Azkaban Project. If proxy user and group permissions are specified in + * Azkaban Project Config, then this method also adds it to the project configuration. + * @param sessionId Session Id. + * @param zipFilePath Zip file to upload. + * @param azkabanProjectConfig Azkaban Project Config. + * @return Project Id. + * @throws IOException + */ public static String replaceAzkabanProject(String sessionId, String zipFilePath, AzkabanProjectConfig azkabanProjectConfig) throws IOException { - - String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl(); - String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName(); - String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription(); - String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers(); - // Change project description - changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription); + changeProjectDescription(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), + azkabanProjectConfig.getAzkabanProjectName(), azkabanProjectConfig.getAzkabanProjectDescription()); // Add proxy user if any // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban @@ -166,7 +178,8 @@ public class AzkabanAjaxAPIClient { if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) { Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get()); for (String user : proxyUsers) { - addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user); + addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), + azkabanProjectConfig.getAzkabanProjectName(), user); } } @@ -175,12 +188,13 @@ public class AzkabanAjaxAPIClient { // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban // 2. Adding same group-user will return an error message, but we will ignore it // (though we can add users, we only support groups at the moment and award them with admin permissions) - if (StringUtils.isNotBlank(groupAdminUsers)) { - String [] groups = StringUtils.split(groupAdminUsers, ","); + if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) { + String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ","); for (String group : groups) { try { - addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false, - false); + addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), + azkabanProjectConfig.getAzkabanProjectName(), group, true, true, + false, false, false,false); } catch (IOException e) { // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting // .. groups @@ -192,21 +206,20 @@ public class AzkabanAjaxAPIClient { } // Upload zip file to azkaban and return projectId - return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath); + return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), + azkabanProjectConfig.getAzkabanProjectName(), zipFilePath); } private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName, String proxyUser) throws IOException { - // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it) - HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&" - + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser)); + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "addProxyUser"); + params.put("project", azkabanProjectName); + params.put("name", proxyUser); - // Make the call, get response - @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(getRequest); - handleResponse(response); + executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params)); } private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName, @@ -219,100 +232,155 @@ public class AzkabanAjaxAPIClient { // Create get request (adding same normal user permission multiple times will throw an error, but we cannot // list whole list of permissions anyways) - HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&" - + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s" - + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name, - isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission)); - - // Make the call, get response - @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(getRequest); - handleResponse(response); + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "addPermission"); + params.put("project", azkabanProjectName); + params.put("name", name); + params.put("group", Boolean.toString(isGroup)); + params.put("permissions[admin]", Boolean.toString(adminPermission)); + params.put("permissions[read]", Boolean.toString(readPermission)); + params.put("permissions[write]", Boolean.toString(writePermission)); + params.put("permissions[execute]", Boolean.toString(executePermission)); + params.put("permissions[schedule]", Boolean.toString(schedulePermission)); + + executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params)); } - private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName, - String jobZipFile) + /*** + * Schedule the Azkaban Project to run with a schedule. + * @param sessionId Session Id. + * @param azkabanProjectId Project Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @throws IOException + */ + public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId, + AzkabanProjectConfig azkabanProjectConfig) throws IOException { + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "scheduleFlow"); + params.put("projectName", azkabanProjectConfig.getAzkabanProjectName()); + params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName()); + params.put("projectId", azkabanProjectId); + params.put("scheduleTime", getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, + LOW_NETWORK_TRAFFIC_END_HOUR, JOB_START_DELAY_MINUTES)); + params.put("scheduleDate", getScheduledDateInAzkabanFormat()); + params.put("is_recurring", "off"); + + // Run once OR push down schedule (TODO: Enable when push down is finalized) + // if (azkabanProjectConfig.isScheduled()) { + // params.put("is_recurring", "on"); + // params.put("period", "1d"); + // } else { + // params.put("is_recurring", "off"); + // } + + executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/schedule", sessionId, params)); + } - // Create post request - HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager"); - HttpEntity entity = MultipartEntityBuilder - .create() - .addTextBody("session.id", sessionId) - .addTextBody("ajax", "upload") - .addBinaryBody("file", new File(jobZipFile), - ContentType.create("application/zip"), azkabanProjectName + ".zip") - .addTextBody("project", azkabanProjectName) - .build(); - postRequest.setEntity(entity); + private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName, + String projectDescription) + throws IOException { + String encodedProjectDescription; + try { + encodedProjectDescription = new URLCodec().encode(projectDescription); + } catch (EncoderException e) { + throw new IOException("Could not encode Azkaban project description", e); + } - // Make the call, get response - @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(postRequest); + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "changeDescription"); + params.put("project", azkabanProjectName); + params.put("description", encodedProjectDescription); - // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban - return handleResponse(response, "projectId").get("projectId"); + executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params)); } - public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId, + /*** + * Execute an existing Azkaban project. + * @param sessionId Session Id. + * @param azkabanProjectId Project Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @throws IOException + */ + public static void executeAzkabanProject(String sessionId, String azkabanProjectId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { - String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl(); - String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName(); - String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName(); + Map<String, String> params = Maps.newHashMap(); + params.put("ajax", "executeFlow"); + params.put("projectName", azkabanProjectConfig.getAzkabanProjectName()); + params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName()); + + executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params)); + } + + private static HttpGet prepareGetRequest(String requestUrl, String sessionId, Map<String, String> params) + throws IOException { + // Create get request + StringBuilder stringEntityBuilder = new StringBuilder(); + stringEntityBuilder.append(String.format("?session.id=%s", sessionId)); + for (Map.Entry<String, String> entry : params.entrySet()) { + stringEntityBuilder.append(String.format("&%s=%s", entry.getKey(), entry.getValue())); + } - String scheduleString = "is_recurring=off"; // run only once - // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule -// if (azkabanProjectConfig.isScheduled()) { -// scheduleString = "is_recurring=on&period=1d"; // schedule once every day -// } + return new HttpGet(requestUrl + stringEntityBuilder); + } + private static HttpPost preparePostRequest(String requestUrl, String sessionId, Map<String, String> params) + throws IOException { // Create post request - HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule"); - StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow" - + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s", - sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId, - getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR, - JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString)); + HttpPost postRequest = new HttpPost(requestUrl); + StringBuilder stringEntityBuilder = new StringBuilder(); + stringEntityBuilder.append(String.format("session.id=%s", sessionId)); + for (Map.Entry<String, String> entry : params.entrySet()) { + if (stringEntityBuilder.length() > 0) { + stringEntityBuilder.append("&"); + } + stringEntityBuilder.append(String.format("%s=%s", entry.getKey(), entry.getValue())); + } + StringEntity input = new StringEntity(stringEntityBuilder.toString()); input.setContentType("application/x-www-form-urlencoded"); postRequest.setEntity(input); postRequest.setHeader("X-Requested-With", "XMLHttpRequest"); + return postRequest; + } + + private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException { + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(getRequest); + return handleResponse(response); + } + + private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException { // Make the call, get response @Cleanup CloseableHttpClient httpClient = getHttpClient(); HttpResponse response = httpClient.execute(postRequest); - handleResponse(response); + return handleResponse(response); } - private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName, - String projectDescription) + private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName, + String jobZipFile) throws IOException { - HttpGet getRequest; - try { - // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it) - getRequest = new HttpGet(String - .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl, - sessionId, azkabanProjectName, new URLCodec().encode(projectDescription))); - } catch (EncoderException e) { - throw new IOException("Could not encode Azkaban project description", e); - } + // Create post request + HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager"); + HttpEntity entity = MultipartEntityBuilder + .create() + .addTextBody("session.id", sessionId) + .addTextBody("ajax", "upload") + .addBinaryBody("file", new File(jobZipFile), + ContentType.create("application/zip"), azkabanProjectName + ".zip") + .addTextBody("project", azkabanProjectName) + .build(); + postRequest.setEntity(entity); // Make the call, get response @Cleanup CloseableHttpClient httpClient = getHttpClient(); - HttpResponse response = httpClient.execute(getRequest); - handleResponse(response); - } + HttpResponse response = httpClient.execute(postRequest); - public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer, - AzkabanProjectConfig azkabanProjectConfig) - throws IOException { - boolean isGoUrl = false; - if (!StringUtils.isBlank(uberdistcp2ToolServer)) { - if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) { - isGoUrl = true; - } - } + // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban + return handleResponse(response, "projectId").get("projectId"); } private static CloseableHttpClient getHttpClient() @@ -350,11 +418,9 @@ public class AzkabanAjaxAPIClient { // Handle error if any handleResponseError(jsonObject); - // Get required responseKeys - if (ArrayUtils.isNotEmpty(responseKeys)) { - for (String responseKey : responseKeys) { - responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", "")); - } + // Get all responseKeys + for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) { + responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", "")); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java index 627761e..a74a6ad 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java @@ -46,6 +46,13 @@ import com.google.common.collect.Lists; @Slf4j public class AzkabanJobHelper { + /*** + * Checks if an Azkaban project exists by name. + * @param sessionId Session Id. + * @param azkabanProjectConfig Azkaban Project Config that contains project name. + * @return true if project exists else false. + * @throws IOException + */ public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists"); @@ -74,6 +81,13 @@ public class AzkabanJobHelper { } } + /*** + * Get Project Id by an Azkaban Project Name. + * @param sessionId Session Id. + * @param azkabanProjectConfig Azkaban Project Config that contains project Name. + * @return Project Id. + * @throws IOException + */ public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName()); @@ -83,6 +97,14 @@ public class AzkabanJobHelper { return projectId; } + /*** + * Create project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to + * Azkaban, setting permissions and schedule. + * @param sessionId Session Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @return Project Id. + * @throws IOException + */ public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName()); @@ -98,6 +120,15 @@ public class AzkabanJobHelper { return projectId; } + /*** + * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to + * Azkaban, setting permissions and schedule. + * @param sessionId Session Id. + * @param azkabanProjectId Project Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @return Project Id. + * @throws IOException + */ public static String replaceAzkabanJob(String sessionId, String azkabanProjectId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName()); @@ -113,6 +144,13 @@ public class AzkabanJobHelper { return projectId; } + /*** + * Schedule an already created Azkaban project. + * @param sessionId Session Id. + * @param azkabanProjectId Project Id. + * @param azkabanProjectConfig Azkaban Project Config that contains schedule information. + * @throws IOException + */ public static void scheduleJob(String sessionId, String azkabanProjectId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { @@ -120,6 +158,13 @@ public class AzkabanJobHelper { AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig); } + /*** + * Change the schedule of an already created Azkaban project. + * @param sessionId Session Id. + * @param azkabanProjectId Project Id. + * @param azkabanProjectConfig Azkaban Project Config that contains schedule information. + * @throws IOException + */ public static void changeJobSchedule(String sessionId, String azkabanProjectId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { @@ -127,7 +172,28 @@ public class AzkabanJobHelper { AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig); } - public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig) + /*** + * Execute an already created Azkaban project. + * @param sessionId Session Id. + * @param azkabanProjectId Project Id. + * @param azkabanProjectConfig Azkaban Project Config that contains schedule information. + * @throws IOException + */ + public static void executeJob(String sessionId, String azkabanProjectId, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Executing Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName()); + AzkabanAjaxAPIClient.executeAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig); + } + + /*** + * Create Azkaban project zip file. + * @param azkabanProjectConfig Azkaban Project Config that contains information about what to include in + * zip file. + * @return Zip file path. + * @throws IOException + */ + private static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig) throws IOException { log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName()); String workDir = azkabanProjectConfig.getWorkDir(); @@ -153,7 +219,7 @@ public class AzkabanJobHelper { jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl); filesToAdd.add(jobJarFile); } catch (IOException e) { - if(failIfJarNotFound) { + if (failIfJarNotFound) { throw e; } log.warn("Could not download: " + jobJarFile); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java index ddae3d9..2bac65d 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java @@ -69,7 +69,7 @@ public class AzkabanProjectConfig { this.azkabanProjectName = constructProjectName(jobSpec, config); this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY); this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY); - this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY); + this.azkabanGroupAdminUsers = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY, ""); this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null)); // Azkaban Project Zip http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java index 5471f0c..f73bc6c 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java @@ -39,7 +39,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst implements SpecExecutorInstanceProducer<Spec>, Closeable { // Session Id for GaaS User - private String sessionId; + private String _sessionId; public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) { @@ -51,7 +51,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst String azkabanPassword = getAzkabanPassword(config); String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY); - sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl); + _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl); } catch (IOException | EncoderException e) { throw new RuntimeException("Could not authenticate with Azkaban", e); } @@ -82,37 +82,46 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst @Override public Future<?> addSpec(Spec addedSpec) { // If project already exists, execute it - - // If project does not already exists, create and execute it - AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec); try { - _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName()); - - // Deleted project also returns true if-project-exists check, so optimistically first create the project - // .. (it will create project if it was never created or deleted), if project exists it will fail with - // .. appropriate exception message, catch that and run in replace project mode if force overwrite is - // .. specified - try { - createNewAzkabanProject(sessionId, azkabanProjectConfig); - } catch (IOException e) { - if ("Project already exists.".equalsIgnoreCase(e.getMessage())) { - if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(), - ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) { - _log.info("Project already exists for this Spec, but force overwrite specified"); - updateExistingAzkabanProject(sessionId, azkabanProjectConfig); + AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec); + boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig); + + // If project does not already exists, create and execute it + if (azkabanProjectExists) { + _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName()); + AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig), + azkabanProjectConfig); + } else { + _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName()); + + // Deleted project also returns true if-project-exists check, so optimistically first create the project + // .. (it will create project if it was never created or deleted), if project exists it will fail with + // .. appropriate exception message, catch that and run in replace project mode if force overwrite is + // .. specified + try { + createNewAzkabanProject(_sessionId, azkabanProjectConfig); + } catch (IOException e) { + if ("Project already exists.".equalsIgnoreCase(e.getMessage())) { + if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(), + ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) { + _log.info("Project already exists for this Spec, but force overwrite specified"); + updateExistingAzkabanProject(_sessionId, azkabanProjectConfig); + } else { + _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s", + azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); + } } else { - _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s", - azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); + throw e; } - } else { - throw e; } } + + } catch (IOException e) { throw new RuntimeException("Issue in setting up Azkaban project.", e); } - return null; + return new CompletedFuture<>(_config, null); } @Override @@ -121,7 +130,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec); try { - updateExistingAzkabanProject(sessionId, azkabanProjectConfig); + updateExistingAzkabanProject(_sessionId, azkabanProjectConfig); } catch (IOException e) { throw new RuntimeException("Issue in setting up Azkaban project.", e); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gradle/scripts/dependencyDefinitions.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index 41e1485..1e86b96 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -69,6 +69,7 @@ ext.externalDependency = [ "hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core", "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion, "httpclient": "org.apache.httpcomponents:httpclient:4.5.2", + "httpmime": "org.apache.httpcomponents:httpmime:4.5.2", "httpcore": "org.apache.httpcomponents:httpcore:4.4.4", "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3", "jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",
