Repository: incubator-gobblin Updated Branches: refs/heads/master 90be15f47 -> 9b9fec817
Azkaban Orchestrator for GaaS Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/08e60efd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/08e60efd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/08e60efd Branch: refs/heads/master Commit: 08e60efd328485554344b179da2a54466d84628b Parents: f96379e Author: Abhishek Tiwari <[email protected]> Authored: Tue Aug 8 12:41:02 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 8 12:41:02 2017 -0700 ---------------------------------------------------------------------- .../service-azkaban-hello-world.template | 24 + .../orchestration/AzkabanAjaxAPIClient.java | 435 +++++++++++++++++++ .../modules/orchestration/AzkabanJobHelper.java | 272 ++++++++++++ .../orchestration/AzkabanProjectConfig.java | 123 ++++++ .../AzkabanSpecExecutorInstance.java | 106 +++++ .../AzkabanSpecExecutorInstanceProducer.java | 158 +++++++ .../orchestration/ServiceAzkabanConfigKeys.java | 38 ++ .../main/resources/default-service-azkaban.conf | 33 ++ .../service-azkaban-hello-world.template | 0 9 files changed, 1189 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-example/src/main/resources/service-azkaban-hello-world.template ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/service-azkaban-hello-world.template b/gobblin-example/src/main/resources/service-azkaban-hello-world.template new file mode 100644 index 0000000..36a7418 --- /dev/null +++ b/gobblin-example/src/main/resources/service-azkaban-hello-world.template @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gobblin.service.azkaban.username=<CHANGE ME> +gobblin.service.azkaban.password=<CHANGE ME> +gobblin.service.azkaban.server.url=<CHANGE ME> +gobblin.service.azkaban.project.namePrefix=GobblinService_ +gobblin.service.azkaban.project.description="Gobblin Service has setup this project" +gobblin.service.azkaban.project.flowName="GobblinServiceFlow" +gobblin.service.azkaban.project.groupAdmins="" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java new file mode 100644 index 0000000..31fc753 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.codec.EncoderException; +import org.apache.commons.codec.net.URLCodec; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.BasicCookieStore; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.TrustStrategy; + +import com.google.common.base.Splitter; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + + +@Slf4j +public class AzkabanAjaxAPIClient { + + private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); + + // TODO: Ensure GET call urls do not grow too big + private static final int LOW_NETWORK_TRAFFIC_BEGIN_HOUR = 17; + private static final int LOW_NETWORK_TRAFFIC_END_HOUR = 22; + private static final int JOB_START_DELAY_MINUTES = 5; + private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000; + private static final URLCodec codec = new URLCodec(); + + public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl) + throws IOException, EncoderException { + // Create post request + HttpPost postRequest = new HttpPost(azkabanServerUrl); + StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login", + username, codec.encode(password))); + input.setContentType("application/x-www-form-urlencoded"); + postRequest.setEntity(input); + postRequest.setHeader("X-Requested-With", "XMLHttpRequest"); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(postRequest); + + return handleResponse(response, "session.id").get("session.id"); + } + + public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { + // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call + // .. because it does not need any additional params other than project name + // Create get request + HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&" + + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId, + azkabanProjectConfig.getAzkabanProjectName())); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(getRequest); + return handleResponse(response, "projectId").get("projectId"); + } + + public static String createAzkabanProject(String sessionId, String zipFilePath, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + + String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl(); + String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName(); + String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription(); + String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers(); + + // Create post request + HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create"); + StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId, + azkabanProjectName, azkabanProjectDescription)); + input.setContentType("application/x-www-form-urlencoded"); + postRequest.setEntity(input); + postRequest.setHeader("X-Requested-With", "XMLHttpRequest"); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(postRequest); + handleResponse(response); + + // Add proxy user if any + if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) { + Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get()); + for (String user : proxyUsers) { + addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user); + } + } + + // Add group permissions if any + // TODO: Support users (not just groups), and different permission types + // (though we can add users, we only support groups at the moment and award them with admin permissions) + if (StringUtils.isNotBlank(groupAdminUsers)) { + String [] groups = StringUtils.split(groupAdminUsers, ","); + for (String group : groups) { + addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, + false, false); + } + } + + // Upload zip file to azkaban and return projectId + return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath); + } + + public static String replaceAzkabanProject(String sessionId, String zipFilePath, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + + String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl(); + String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName(); + String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription(); + String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers(); + + // Change project description + changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription); + + // Add proxy user if any + // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban + // 2. Adding same proxy user multiple times is a non-issue + // Add proxy user if any + if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) { + Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get()); + for (String user : proxyUsers) { + addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user); + } + } + + // Add group permissions if any + // TODO: Support users (not just groups), and different permission types + // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban + // 2. Adding same group-user will return an error message, but we will ignore it + // (though we can add users, we only support groups at the moment and award them with admin permissions) + if (StringUtils.isNotBlank(groupAdminUsers)) { + String [] groups = StringUtils.split(groupAdminUsers, ","); + for (String group : groups) { + try { + addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false, + false); + } catch (IOException e) { + // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting + // .. groups + if (!"Group permission already exists.".equalsIgnoreCase(e.getMessage())) { + throw e; + } + } + } + } + + // Upload zip file to azkaban and return projectId + return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath); + } + + private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName, + String proxyUser) + throws IOException { + + // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it) + HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&" + + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser)); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(getRequest); + handleResponse(response); + } + + private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName, + String name, boolean isGroup, boolean adminPermission, boolean readPermission, boolean writePermission, + boolean executePermission, boolean schedulePermission) + throws IOException { + + // NOTE: We are not listing the permissions before adding them, because Azkaban in its current state only + // .. returns user permissions and not group permissions + + // Create get request (adding same normal user permission multiple times will throw an error, but we cannot + // list whole list of permissions anyways) + HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&" + + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s" + + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name, + isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission)); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(getRequest); + handleResponse(response); + } + + private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName, + String jobZipFile) + throws IOException { + + // Create post request + HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager"); + HttpEntity entity = MultipartEntityBuilder + .create() + .addTextBody("session.id", sessionId) + .addTextBody("ajax", "upload") + .addBinaryBody("file", new File(jobZipFile), + ContentType.create("application/zip"), azkabanProjectName + ".zip") + .addTextBody("project", azkabanProjectName) + .build(); + postRequest.setEntity(entity); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(postRequest); + + // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban + return handleResponse(response, "projectId").get("projectId"); + } + + public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl(); + String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName(); + String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName(); + + String scheduleString = "is_recurring=off"; // run only once + // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule +// if (azkabanProjectConfig.isScheduled()) { +// scheduleString = "is_recurring=on&period=1d"; // schedule once every day +// } + + // Create post request + HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule"); + StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow" + + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s", + sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId, + getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR, + JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString)); + input.setContentType("application/x-www-form-urlencoded"); + postRequest.setEntity(input); + postRequest.setHeader("X-Requested-With", "XMLHttpRequest"); + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(postRequest); + handleResponse(response); + } + + private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName, + String projectDescription) + throws IOException { + + HttpGet getRequest; + try { + // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it) + getRequest = new HttpGet(String + .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl, + sessionId, azkabanProjectName, new URLCodec().encode(projectDescription))); + } catch (EncoderException e) { + throw new IOException("Could not encode Azkaban project description", e); + } + + // Make the call, get response + @Cleanup CloseableHttpClient httpClient = getHttpClient(); + HttpResponse response = httpClient.execute(getRequest); + handleResponse(response); + } + + public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + boolean isGoUrl = false; + if (!StringUtils.isBlank(uberdistcp2ToolServer)) { + if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) { + isGoUrl = true; + } + } + } + + private static CloseableHttpClient getHttpClient() + throws IOException { + try { + // Self sign SSL + SSLContextBuilder builder = new SSLContextBuilder(); + builder.loadTrustMaterial(null, (TrustStrategy) new TrustSelfSignedStrategy()); + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build()); + + // Create client + return HttpClients.custom().setSSLSocketFactory(sslsf).setDefaultCookieStore(new BasicCookieStore()).build(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { + throw new IOException("Issue with creating http client", e); + } + } + + private static Map<String, String> handleResponse(HttpResponse response, String... responseKeys) + throws IOException { + if (response.getStatusLine().getStatusCode() != 201 && response.getStatusLine().getStatusCode()!= 200) { + log.error("Failed : HTTP error code : " + response.getStatusLine().getStatusCode()); + throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode()); + } + + // Get response in string + InputStream in = response.getEntity().getContent(); + String jsonResponseString = IOUtils.toString(in, "UTF-8"); + log.info("Response string: " + jsonResponseString); + + // Parse Json + Map<String, String> responseMap = new HashMap<>(); + if (StringUtils.isNotBlank(jsonResponseString)) { + JsonObject jsonObject = new JsonParser().parse(jsonResponseString).getAsJsonObject(); + + // Handle error if any + handleResponseError(jsonObject); + + // Get required responseKeys + if (ArrayUtils.isNotEmpty(responseKeys)) { + for (String responseKey : responseKeys) { + responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", "")); + } + } + } + + return responseMap; + } + + private static void handleResponseError(JsonObject jsonObject) throws IOException { + // Azkaban does not has a standard for error messages tag + if (null != jsonObject.get("status") && "error".equalsIgnoreCase(jsonObject.get("status").toString() + .replaceAll("\"", ""))) { + String message = (null != jsonObject.get("message")) ? + jsonObject.get("message").toString().replaceAll("\"", "") : "Issue in creating project"; + throw new IOException(message); + } + + if (null != jsonObject.get("error")) { + String error = jsonObject.get("error").toString().replaceAll("\"", ""); + throw new IOException(error); + } + } + + /*** + * Generate a random scheduled time between specified execution time window in the Azkaban compatible format + * which is: hh,mm,a,z Eg. ScheduleTime=12,00,PM,PDT + * + * @param windowStartHour Window start hour in 24 hr (HH) format (inclusive) + * @param windowEndHour Window end hour in 24 hr (HH) format (exclusive) + * @param delayMinutes If current time is within window, then additional delay for bootstrapping if desired + * @return Scheduled time string of the format hh,mm,a,z + */ + public static String getScheduledTimeInAzkabanFormat(int windowStartHour, int windowEndHour, int delayMinutes) { + // Validate + if (windowStartHour < 0 || windowEndHour > 23 || windowStartHour >= windowEndHour) { + throw new IllegalArgumentException("Window start should be less than window end, and both should be between " + + "0 and 23"); + } + if (delayMinutes < 0 || delayMinutes > 59) { + throw new IllegalArgumentException("Delay in minutes should be between 0 and 59 (inclusive)"); + } + + // Setup window + Calendar windowStartTime = Calendar.getInstance(); + windowStartTime.set(Calendar.HOUR_OF_DAY, windowStartHour); + windowStartTime.set(Calendar.MINUTE, 0); + windowStartTime.set(Calendar.SECOND, 0); + + Calendar windowEndTime = Calendar.getInstance(); + windowEndTime.set(Calendar.HOUR_OF_DAY, windowEndHour); + windowEndTime.set(Calendar.MINUTE, 0); + windowEndTime.set(Calendar.SECOND, 0); + + // Check if current time is between windowStartTime and windowEndTime, then let the execution happen + // after delayMinutes minutes + Calendar now = Calendar.getInstance(); + if (now.after(windowStartTime) && now.before(windowEndTime)) { + // Azkaban takes a few seconds / a minute to bootstrap, + // so extra few minutes get the first execution to run instantly + now.add(Calendar.MINUTE, delayMinutes); + + return new SimpleDateFormat("hh,mm,a,z").format(now.getTime()); + } + + // Current time is not between windowStartTime and windowEndTime, so get random execution time for next day + int allowedSchedulingWindow = (int)((windowEndTime.getTimeInMillis() - windowStartTime.getTimeInMillis()) / + MILLISECONDS_IN_HOUR); + int randomHourInWindow = new Random(System.currentTimeMillis()).nextInt(allowedSchedulingWindow); + int randomMinute = new Random(System.currentTimeMillis()).nextInt(60); + windowStartTime.add(Calendar.HOUR, randomHourInWindow); + windowStartTime.set(Calendar.MINUTE, randomMinute); + + return new SimpleDateFormat("hh,mm,a,z").format(windowStartTime.getTime()); + } + + private static String getScheduledDateInAzkabanFormat() { + // Eg. ScheduleDate=07/22/2014" + return new SimpleDateFormat("MM/dd/yyyy").format(new Date()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java new file mode 100644 index 0000000..627761e --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.compress.archivers.ArchiveException; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.ArchiveStreamFactory; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.Lists; + + +@Slf4j +public class AzkabanJobHelper { + + public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists"); + try { + // NOTE: hacky way to determine if project already exists because Azkaban does not provides a way to + // .. check if the project already exists or not + boolean isPresent = StringUtils.isNotBlank(AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig)); + log.info("Project exists: " + isPresent); + + return isPresent; + } catch (IOException e) { + // Project doesn't exists + if (String.format("Project %s doesn't exist.", azkabanProjectConfig.getAzkabanProjectName()) + .equalsIgnoreCase(e.getMessage())) { + log.info("Project does not exists."); + return false; + } + // Project exists but with no read access to current user + if ("Permission denied. Need READ access.".equalsIgnoreCase(e.getMessage())) { + log.info("Project exists, but current user does not has READ access."); + return true; + } + // Some other error + log.error("Issue in checking if project is present", e); + throw e; + } + } + + public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName()); + String projectId = AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig); + log.info("Project id: " + projectId); + + return projectId; + } + + public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName()); + + // Create zip file + String zipFilePath = createAzkabanJobZip(azkabanProjectConfig); + log.info("Zip file path: " + zipFilePath); + + // Upload zip file to Azkaban + String projectId = AzkabanAjaxAPIClient.createAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig); + log.info("Project Id: " + projectId); + + return projectId; + } + + public static String replaceAzkabanJob(String sessionId, String azkabanProjectId, + AzkabanProjectConfig azkabanProjectConfig) throws IOException { + log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName()); + + // Create zip file + String zipFilePath = createAzkabanJobZip(azkabanProjectConfig); + log.info("Zip file path: " + zipFilePath); + + // Replace the zip file on Azkaban + String projectId = AzkabanAjaxAPIClient.replaceAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig); + log.info("Project Id: " + projectId); + + return projectId; + } + + public static void scheduleJob(String sessionId, String azkabanProjectId, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Scheduling Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName()); + AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig); + } + + public static void changeJobSchedule(String sessionId, String azkabanProjectId, + AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Changing schedule for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName()); + AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig); + } + + public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName()); + String workDir = azkabanProjectConfig.getWorkDir(); + + Optional<String> jarUrlTemplate = azkabanProjectConfig.getAzkabanZipJarUrlTemplate(); + Optional<List<String>> jarNames = azkabanProjectConfig.getAzkabanZipJarNames(); + Optional<String> jarVersion = azkabanProjectConfig.getAzkabanZipJarVersion(); + Optional<List<String>> additionalFiles = azkabanProjectConfig.getAzkabanZipAdditionalFiles(); + boolean failIfJarNotFound = azkabanProjectConfig.getFailIfJarNotFound(); + String jobFlowName = azkabanProjectConfig.getAzkabanProjectFlowName(); + String zipFilename = azkabanProjectConfig.getAzkabanProjectZipFilename(); + + // Download the job jars + List<File> filesToAdd = Lists.newArrayList(); + if (jarNames.isPresent() && jarUrlTemplate.isPresent() && jarVersion.isPresent()) { + String urlTemplate = jarUrlTemplate.get(); + String version = jarVersion.get(); + for (String jarName : jarNames.get()) { + String jobJarUrl = urlTemplate.replaceAll("<module-version>", version).replaceAll("<module-name>", jarName); + log.info("Downloading job jar from: " + jobJarUrl + " to: " + workDir); + File jobJarFile = null; + try { + jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl); + filesToAdd.add(jobJarFile); + } catch (IOException e) { + if(failIfJarNotFound) { + throw e; + } + log.warn("Could not download: " + jobJarFile); + } + } + } + + // Download additional files + if (additionalFiles.isPresent()) { + List<String> files = additionalFiles.get(); + for (String fileName : files) { + log.info("Downloading additional file from: " + fileName + " to: " + workDir); + File additionalFile = null; + try { + additionalFile = downloadAzkabanJobJar(workDir, fileName); + filesToAdd.add(additionalFile); + } catch (IOException e) { + if(failIfJarNotFound) { + throw e; + } + log.warn("Could not download: " + additionalFile); + } + } + } + + // Write the config files + log.info("Writing Azkaban config files"); + File [] jobConfigFile = writeAzkabanConfigFiles(workDir, jobFlowName, azkabanProjectConfig); + filesToAdd.add(jobConfigFile[0]); + + // Create the zip file + log.info("Writing zip file"); + String zipfile = createZipFile(workDir, zipFilename, filesToAdd); + log.info("Wrote zip file: " + zipfile); + + return zipfile; + } + + private static String createZipFile(String directory, String zipFilename, List<File> filesToAdd) + throws IOException { + // Determine final zip file path + String zipFilePath = String.format("%s/%s", directory, zipFilename); + File zipFile = new File(zipFilePath); + zipFile.delete(); + + // Create and add files to zip file + addFilesToZip(zipFile, filesToAdd); + + return zipFilePath; + } + + private static void addFilesToZip(File zipFile, List<File> filesToAdd) throws IOException { + try { + @Cleanup + OutputStream archiveStream = new FileOutputStream(zipFile); + @Cleanup + ArchiveOutputStream archive = + new ArchiveStreamFactory().createArchiveOutputStream(ArchiveStreamFactory.ZIP, archiveStream); + + for (File fileToAdd : filesToAdd) { + ZipArchiveEntry entry = new ZipArchiveEntry(fileToAdd.getName()); + archive.putArchiveEntry(entry); + + @Cleanup + BufferedInputStream input = new BufferedInputStream(new FileInputStream(fileToAdd)); + IOUtils.copy(input, archive); + archive.closeArchiveEntry(); + } + + archive.finish(); + } catch (ArchiveException e) { + throw new IOException("Issue with creating archive", e); + } + } + + private static File[] writeAzkabanConfigFiles(String workDir, String flowName, AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + // Determine final config file path + String jobFilePath = String.format("%s/%s.job", workDir, flowName); + File jobFile = new File(jobFilePath); + jobFile.delete(); + + StringBuilder propertyFileContent = new StringBuilder(); + for (Map.Entry entry : azkabanProjectConfig.getJobSpec().getConfigAsProperties().entrySet()) { + propertyFileContent.append(String.format("%s=%s", entry.getKey(), entry.getValue())).append("\n"); + } + + // Write the job file + FileUtils.writeStringToFile(jobFile, propertyFileContent.toString(), Charset.forName("UTF-8"),true); + + return new File[] {jobFile}; + } + + private static File downloadAzkabanJobJar(String workDir, String jobJarUrl) + throws IOException { + // Determine final jar file path + String[] jobJarUrlParts = jobJarUrl.trim().split("/"); + String jobJarName = jobJarUrlParts[jobJarUrlParts.length-1]; + String jobJarFilePath = String.format("%s/%s", workDir, jobJarName); + File jobJarFile = new File(jobJarFilePath); + jobJarFile.delete(); + + // Create work directory if not already exists + FileUtils.forceMkdir(new File(workDir)); + + // Download jar file from artifactory + @Cleanup InputStream jobJarInputStream = new URL(jobJarUrl).openStream(); + @Cleanup OutputStream jobJarOutputStream = new FileOutputStream(jobJarFile); + IOUtils.copy(jobJarInputStream, jobJarOutputStream); + + // TODO: compare checksum + + return jobJarFile; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java new file mode 100644 index 0000000..ddae3d9 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.util.List; +import java.util.Optional; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.ToString; + +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.util.ConfigUtils; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +@Getter +@ToString +@AllArgsConstructor +@Builder(builderMethodName = "hiddenBuilder") +/*** + * Class to hold Azkaban project specific configs + */ +public class AzkabanProjectConfig { + private static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf"; + + private final String azkabanServerUrl; + + private final String azkabanProjectName; + private final String azkabanProjectDescription; + private final String azkabanProjectFlowName; + private final String azkabanGroupAdminUsers; + private final Optional<String> azkabanUserToProxy; + + private final Optional<List<String>> azkabanZipJarNames; + private final Optional<String> azkabanZipJarUrlTemplate; + private final Optional<String> azkabanZipJarVersion; + private final Optional<List<String>> azkabanZipAdditionalFiles; + private final Boolean failIfJarNotFound; + + private final JobSpec jobSpec; + + public AzkabanProjectConfig(JobSpec jobSpec) { + // Extract config objects + this.jobSpec = jobSpec; + Config defaultConfig = ConfigFactory.load(DEFAULT_AZKABAN_PROJECT_CONFIG_FILE); + Config config = jobSpec.getConfig().withFallback(defaultConfig); + + // Azkaban Infrastructure + this.azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY); + + // Azkaban Project Metadata + this.azkabanProjectName = constructProjectName(jobSpec, config); + this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY); + this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY); + this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY); + this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null)); + + // Azkaban Project Zip + this.azkabanZipJarNames = Optional.ofNullable(ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY)); + this.azkabanZipJarUrlTemplate = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY, null)); + this.azkabanZipJarVersion = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY, null)); + this.azkabanZipAdditionalFiles = Optional.ofNullable( + ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY)); + this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false); + } + + private String constructProjectName(JobSpec jobSpec, Config config) { + String projectNamePrefix = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_NAME_PREFIX_KEY, ""); + String projectNamePostfix = null == jobSpec.getUri() ? "" : + jobSpec.getUri().toString().replaceAll("_", "-").replaceAll("[^A-Za-z0-9\\-]", "_"); + + return trimProjectName(String.format("%s_%s", projectNamePrefix, projectNamePostfix)); + } + + /*** + * Get Azkaban project zip file name + * @return Azkaban project zip file name + */ + public String getAzkabanProjectZipFilename() { + return String.format("%s.zip", azkabanProjectName); + } + + /*** + * Get Azkaban project working directory, generated by prefixing a temp name + * @return Azkaban project working directory + */ + public String getWorkDir() { + return String.format("%s/%s/%s/%s", System.getProperty("user.dir"), "serviceAzkaban", azkabanProjectName, System.currentTimeMillis()); + } + + private static String trimProjectName(String projectName) { + // Azkaban does not support name greater than 64 chars, so limit it to 64 chars + if (projectName.length() > 64) { + // We are using string.hashcode() so that for same path the generated project name is same (and hence checking + // .. for path duplicates is deterministic. Using UUID or currentMillis will produce different shortened path + // .. for the same path every time) + int pathHash = projectName.hashCode(); + if (pathHash < 0) { + pathHash *= -1; + } + projectName = String.format("%s_%s", projectName.substring(0, 53), pathHash); + } + + return projectName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java new file mode 100644 index 0000000..65209c3 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.SpecExecutorInstance; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; + + +public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance { + protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); + protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults(); + + // Executor Instance + protected final Config _config; + protected final Logger _log; + protected final URI _specExecutorInstanceUri; + protected final Map<String, String> _capabilities; + + public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) { + _config = config; + _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); + try { + _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, + "NA")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + _capabilities = Maps.newHashMap(); + if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) { + String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY); + List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr); + for (String capability : capabilities) { + List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability); + Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported " + + "per capability, found: " + currentCapability); + _capabilities.put(currentCapability.get(0), currentCapability.get(1)); + } + } + } + + @Override + public URI getUri() { + return _specExecutorInstanceUri; + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null); + } + + @Override + public Future<Config> getConfig() { + return new CompletedFuture<>(_config, null); + } + + @Override + public Future<String> getHealth() { + return new CompletedFuture<>("Healthy", null); + } + + @Override + public Future<? extends Map<String, String>> getCapabilities() { + return new CompletedFuture<>(_capabilities, null); + } + + @Override + protected void startUp() throws Exception { + // nothing to do in default implementation + } + + @Override + protected void shutDown() throws Exception { + // nothing to do in default implementation + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java new file mode 100644 index 0000000..f093af2 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.commons.codec.EncoderException; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + + +public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInstance + implements SpecExecutorInstanceProducer<Spec>, Closeable { + + // Session Id for GaaS User + private String sessionId; + + + public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) { + super(config, log); + + try { + // Initialize Azkaban client / producer and cache credentials + String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY); + String azkabanPassword = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY); + String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY); + + sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl); + } catch (IOException | EncoderException e) { + throw new RuntimeException("Could not authenticate with Azkaban", e); + } + } + + public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) { + this(config, Optional.of(log)); + } + + /** Constructor with no logging */ + public AzkabanSpecExecutorInstanceProducer(Config config) { + this(config, Optional.<Logger>absent()); + } + + @Override + public void close() throws IOException { + + } + + @Override + public Future<?> addSpec(Spec addedSpec) { + // If project already exists, execute it + + // If project does not already exists, create and execute it + AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec); + try { + _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName()); + + // Deleted project also returns true if-project-exists check, so optimistically first create the project + // .. (it will create project if it was never created or deleted), if project exists it will fail with + // .. appropriate exception message, catch that and run in replace project mode if force overwrite is + // .. specified + try { + createNewAzkabanProject(sessionId, azkabanProjectConfig); + } catch (IOException e) { + if ("Project already exists.".equalsIgnoreCase(e.getMessage())) { + if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(), + ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) { + _log.info("Project already exists for this Spec, but force overwrite specified"); + updateExistingAzkabanProject(sessionId, azkabanProjectConfig); + } else { + _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s", + azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); + } + } else { + throw e; + } + } + } catch (IOException e) { + throw new RuntimeException("Issue in setting up Azkaban project.", e); + } + + return null; + } + + @Override + public Future<?> updateSpec(Spec updatedSpec) { + // Re-create project + AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec); + + try { + updateExistingAzkabanProject(sessionId, azkabanProjectConfig); + } catch (IOException e) { + throw new RuntimeException("Issue in setting up Azkaban project.", e); + } + + return new CompletedFuture<>(_config, null); + } + + @Override + public Future<?> deleteSpec(URI deletedSpecURI) { + // Delete project + throw new UnsupportedOperationException(); + } + + @Override + public Future<? extends List<Spec>> listSpecs() { + throw new UnsupportedOperationException(); + } + + private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { + // Create Azkaban Job + String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig); + + // Schedule Azkaban Job + AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig); + + _log.info(String.format("Azkaban project created: %smanager?project=%s", + azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); + } + + private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { + _log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(), + azkabanProjectConfig.getAzkabanProjectName())); + + // Get project Id + String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig); + + // Replace Azkaban Job + AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig); + + // Change schedule + AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java new file mode 100644 index 0000000..762561c --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +public class ServiceAzkabanConfigKeys { + public static final String GOBBLIN_SERVICE_AZKABAN_PREFIX = "gobblin.service.azkaban."; + + // Azkaban Session Specifics + public static final String AZKABAN_USERNAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "username"; + public static final String AZKABAN_PASSWORD_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "password"; + public static final String AZKABAN_SERVER_URL_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "server.url"; + public static final String AZKABAN_PROJECT_NAME_PREFIX_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.namePrefix"; + public static final String AZKABAN_PROJECT_DESCRIPTION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.description"; + public static final String AZKABAN_PROJECT_USER_TO_PROXY_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.userToProxy"; + public static final String AZKABAN_PROJECT_FLOW_NAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.flowName"; + public static final String AZKABAN_PROJECT_GROUP_ADMINS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.groupAdmins"; + public static final String AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarUrlTemplate"; + public static final String AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarNames"; + public static final String AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarVersion"; + public static final String AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.failIfJarNotFound"; + public static final String AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.additionalFilesUrl"; + public static final String AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.overwriteIfExists"; +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf new file mode 100644 index 0000000..01a26f1 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default values +# These values should generally come from template being used by the Service +gobblin.service.azkaban.project.namePrefix=GobblinService_ +gobblin.service.azkaban.project.description="Gobblin Service has setup this project" +gobblin.service.azkaban.project.flowName="GobblinServiceFlow" + +gobblin.service.azkaban.project.zip.jarUrlTemplate=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplate} +gobblin.service.azkaban.project.zip.jarNames=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules} +gobblin.service.azkaban.project.zip.jarVersion=${gobblin.service.azkaban.project.job.jar.mavenGobblinVersion} + +gobblin.service.azkaban.project.zip.failIfJarNotFound=false +gobblin.service.azkaban.project.zip.additionalFilesUrl="" + +gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar" +gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils" +gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template b/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template new file mode 100644 index 0000000..e69de29
