Repository: incubator-gobblin Updated Branches: refs/heads/master 27a54f05e -> 78da23b11
[GOBBLIN-572] Add AzkabanClient Closes #2437 from yukuai518/azkcli Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/78da23b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/78da23b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/78da23b1 Branch: refs/heads/master Commit: 78da23b11d2e6a8e05e5edbf33db0e425589fbbc Parents: 27a54f0 Author: Kuai Yu <[email protected]> Authored: Tue Aug 28 15:36:37 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Aug 28 15:36:37 2018 -0700 ---------------------------------------------------------------------- .../orchestration/AzkabanAjaxAPIClient.java | 66 +-- .../modules/orchestration/AzkabanClient.java | 475 +++++++++++++++++++ .../orchestration/AzkabanClientException.java | 35 ++ .../orchestration/AzkabanClientParams.java | 43 ++ .../orchestration/AzkabanClientStatus.java | 60 +++ .../orchestration/AzkabanExecuteFlowStatus.java | 42 ++ .../AzkabanFetchExecuteFlowStatus.java | 45 ++ .../orchestration/AzkabanClientTest.java | 287 +++++++++++ .../resources/azkakaban-job-basic.properties | 7 + .../test/resources/local-azkaban-service.conf | 25 + 10 files changed, 1029 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java index f54d6a5..3494c5c 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java @@ -18,23 +18,17 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; -import java.util.HashMap; import java.util.Map; import java.util.Random; -import lombok.Cleanup; -import lombok.extern.slf4j.Slf4j; - import org.apache.commons.codec.EncoderException; import org.apache.commons.codec.net.URLCodec; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -54,12 +48,16 @@ import org.apache.http.ssl.TrustStrategy; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import com.google.common.collect.Maps; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; @Slf4j +@Deprecated +/** + * This format of azkaban client is obsolete. Please use {@link AzkabanClient} as the new alternative. + */ public class AzkabanAjaxAPIClient { private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); @@ -367,7 +365,7 @@ public class AzkabanAjaxAPIClient { // Make the call, get response @Cleanup CloseableHttpClient httpClient = getHttpClient(); HttpResponse response = httpClient.execute(getRequest); - return handleResponse(response); + return AzkabanClient.handleResponse(response); } @VisibleForTesting @@ -375,7 +373,7 @@ public class AzkabanAjaxAPIClient { // Make the call, get response @Cleanup CloseableHttpClient httpClient = getHttpClient(); HttpResponse response = httpClient.execute(postRequest); - return handleResponse(response); + return AzkabanClient.handleResponse(response); } private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName, @@ -399,7 +397,7 @@ public class AzkabanAjaxAPIClient { HttpResponse response = httpClient.execute(postRequest); // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban - return handleResponse(response, "projectId").get("projectId"); + return AzkabanClient.handleResponse(response).get("projectId"); } private static CloseableHttpClient getHttpClient() @@ -417,50 +415,6 @@ public class AzkabanAjaxAPIClient { } } - private static Map<String, String> handleResponse(HttpResponse response, String... responseKeys) - throws IOException { - if (response.getStatusLine().getStatusCode() != 201 && response.getStatusLine().getStatusCode()!= 200) { - log.error("Failed : HTTP error code : " + response.getStatusLine().getStatusCode()); - throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode()); - } - - // Get response in string - InputStream in = response.getEntity().getContent(); - String jsonResponseString = IOUtils.toString(in, "UTF-8"); - log.info("Response string: " + jsonResponseString); - - // Parse Json - Map<String, String> responseMap = new HashMap<>(); - if (StringUtils.isNotBlank(jsonResponseString)) { - JsonObject jsonObject = new JsonParser().parse(jsonResponseString).getAsJsonObject(); - - // Handle error if any - handleResponseError(jsonObject); - - // Get all responseKeys - for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) { - responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", "")); - } - } - - return responseMap; - } - - private static void handleResponseError(JsonObject jsonObject) throws IOException { - // Azkaban does not has a standard for error messages tag - if (null != jsonObject.get("status") && "error".equalsIgnoreCase(jsonObject.get("status").toString() - .replaceAll("\"", ""))) { - String message = (null != jsonObject.get("message")) ? - jsonObject.get("message").toString().replaceAll("\"", "") : "Issue in creating project"; - throw new IOException(message); - } - - if (null != jsonObject.get("error")) { - String error = jsonObject.get("error").toString().replaceAll("\"", ""); - throw new IOException(error); - } - } - /*** * Generate a random scheduled time between specified execution time window in the Azkaban compatible format * which is: hh,mm,a,z Eg. ScheduleTime=12,00,PM,PDT http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java new file mode 100644 index 0000000..75d088d --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.NameValuePair; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.message.BasicHeader; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.TrustStrategy; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import lombok.Builder; + + +/** + * A simple client that uses Ajax API to communicate with Azkaban server. + * + * Lombok will not consider fields from the superclass in the generated builder class. For a workaround, we put + * @Builder in constructors to allow Builder inheritance. + * + * @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/} + * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api} + */ +public class AzkabanClient implements Closeable { + protected final String username; + protected final String password; + protected final String url; + protected final long sessionExpireInMin; // default value is 12h. + + protected String sessionId; + protected long sessionCreationTime = 0; + protected CloseableHttpClient client; + private static Logger log = LoggerFactory.getLogger(AzkabanClient.class); + + /** + * Child class should have a different builderMethodName. + */ + @Builder + protected AzkabanClient(String username, + String password, + String url, + long sessionExpireInMin) + throws AzkabanClientException { + this.username = username; + this.password = password; + this.url = url; + this.sessionExpireInMin = sessionExpireInMin; + this.client = getClient(); + this.initializeSession(); + } + + /** + * Create a session id that can be used in the future to communicate with Azkaban server. + */ + protected void initializeSession() throws AzkabanClientException { + try { + HttpPost httpPost = new HttpPost(this.url); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password)); + httpPost.setEntity(new UrlEncodedFormEntity(nvps)); + CloseableHttpResponse response = this.client.execute(httpPost); + + try { + HttpEntity entity = response.getEntity(); + + // retrieve session id from entity + String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8"); + this.sessionId = parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID); + EntityUtils.consume(entity); + } finally { + response.close(); + } + this.sessionCreationTime = System.nanoTime(); + } catch (Exception e) { + throw new AzkabanClientException("Azkaban client cannot initialize session.", e); + } + } + + /** + * Create a {@link CloseableHttpClient} used to communicate with Azkaban server. + * Derived class can configure different http client by overriding this method. + * + * @return A closeable http client. + */ + protected CloseableHttpClient getClient() throws AzkabanClientException { + try { + // SSLSocketFactory using custom TrustStrategy that ignores warnings about untrusted certificates + // Self sign SSL + SSLContextBuilder sslcb = new SSLContextBuilder(); + sslcb.loadTrustMaterial(null, (TrustStrategy) new TrustSelfSignedStrategy()); + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslcb.build()); + + HttpClientBuilder builder = HttpClientBuilder.create(); + RequestConfig requestConfig = RequestConfig.copy(RequestConfig.DEFAULT) + .setSocketTimeout(10000) + .setConnectTimeout(10000) + .setConnectionRequestTimeout(10000) + .build(); + + builder.disableCookieManagement() + .useSystemProperties() + .setDefaultRequestConfig(requestConfig) + .setConnectionManager(new BasicHttpClientConnectionManager()) + .setSSLSocketFactory(sslsf); + + return builder.build(); + } catch (Exception e) { + throw new AzkabanClientException("HttpClient cannot be created", e); + } + } + + private void refreshSession() throws AzkabanClientException { + Preconditions.checkArgument(this.sessionCreationTime != 0); + if ((System.nanoTime() - this.sessionCreationTime) > Duration.ofMinutes(this.sessionExpireInMin).toNanos()) { + log.info("Session expired. Generating a new session."); + this.initializeSession(); + } + } + + /** + * Convert a {@link HttpResponse} to a <string, string> map. + * Put protected modifier here so it is visible to {@link AzkabanAjaxAPIClient}. + * + * @param response An http response returned by {@link org.apache.http.client.HttpClient} execution. + * This should be JSON string. + * @return A map composed by the first level of KV pair of json object + */ + protected static Map<String, String> handleResponse(HttpResponse response) throws IOException { + int code = response.getStatusLine().getStatusCode(); + if (code != HttpStatus.SC_CREATED && code != HttpStatus.SC_OK) { + log.error("Failed : HTTP error code : " + response.getStatusLine().getStatusCode()); + throw new AzkabanClientException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode()); + } + + // Get response in string + HttpEntity entity = null; + String jsonResponseString; + + try { + entity = response.getEntity(); + jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8"); + log.info("Response string: " + jsonResponseString); + } catch (Exception e) { + throw new AzkabanClientException("Cannot convert response to a string", e); + } finally { + if (entity != null) { + EntityUtils.consume(entity); + } + } + + return AzkabanClient.parseResponse(jsonResponseString); + } + + private static Map<String, String> parseResponse(String jsonResponseString) throws IOException { + // Parse Json + Map<String, String> responseMap = new HashMap<>(); + if (StringUtils.isNotBlank(jsonResponseString)) { + JsonObject jsonObject = new JsonParser().parse(jsonResponseString).getAsJsonObject(); + + // Handle error if any + handleResponseError(jsonObject); + + // Get all responseKeys + for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) { + responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", "")); + } + } + return responseMap; + } + + private static void handleResponseError(JsonObject jsonObject) throws IOException { + // Azkaban does not has a standard for error messages tag + if (null != jsonObject.get(AzkabanClientParams.STATUS) && + AzkabanClientParams.ERROR.equalsIgnoreCase(jsonObject.get(AzkabanClientParams.STATUS).toString() + .replaceAll("\"", ""))) { + String message = (null != jsonObject.get(AzkabanClientParams.MESSAGE)) ? jsonObject.get(AzkabanClientParams.MESSAGE).toString() + .replaceAll("\"", "") : "Unknown issue"; + throw new IOException(message); + } + + if (null != jsonObject.get(AzkabanClientParams.ERROR)) { + String error = jsonObject.get(AzkabanClientParams.ERROR).toString().replaceAll("\"", ""); + throw new AzkabanClientException(error); + } + } + + /** + * Creates a project. + * + * @param projectName project name + * @param description project description + * + * @return A status object indicating if AJAX request is successful. + */ + public AzkabanClientStatus createProject( + String projectName, + String description) { + try { + refreshSession(); + HttpPost httpPost = new HttpPost(this.url + "/manager"); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "create")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, projectName)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.DESCRIPTION, description)); + httpPost.setEntity(new UrlEncodedFormEntity(nvps)); + + Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); + Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); + httpPost.setHeaders(new Header[]{contentType, requestType}); + + CloseableHttpResponse response = this.client.execute(httpPost); + + try { + handleResponse(response); + return new AzkabanClientStatus.SUCCESS(); + } finally { + response.close(); + } + } catch (Exception e) { + return new AzkabanClientStatus.FAIL("Azkaban client cannot create project.", e); + } + } + + /** + * Deletes a project. Currently no response message will be returned after finishing + * the delete operation. Thus success status is always expected. + * + * @param projectName project name + * + * @return A status object indicating if AJAX request is successful. + */ + public AzkabanClientStatus deleteProject(String projectName) { + try { + refreshSession(); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.DELETE, "true")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName)); + + Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); + Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); + + HttpGet httpGet = new HttpGet(url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8")); + httpGet.setHeaders(new Header[]{contentType, requestType}); + + CloseableHttpResponse response = this.client.execute(httpGet); + response.close(); + return new AzkabanClientStatus.SUCCESS(); + + } catch (Exception e) { + return new AzkabanClientStatus.FAIL("Azkaban client cannot delete project = " + + projectName, e); + } + } + + /** + * Updates a project by uploading a new zip file. Before uploading any project zip files, + * the project should be created first. + * + * @param projectName project name + * @param zipFile zip file + * + * @return A status object indicating if AJAX request is successful. + */ + public AzkabanClientStatus uploadProjectZip( + String projectName, + File zipFile) { + try { + refreshSession(); + HttpPost httpPost = new HttpPost(this.url + "/manager"); + HttpEntity entity = MultipartEntityBuilder.create() + .addTextBody(AzkabanClientParams.SESSION_ID, sessionId) + .addTextBody(AzkabanClientParams.AJAX, "upload") + .addTextBody(AzkabanClientParams.PROJECT, projectName) + .addBinaryBody("file", zipFile, + ContentType.create("application/zip"), zipFile.getName()) + .build(); + httpPost.setEntity(entity); + + CloseableHttpResponse response = this.client.execute(httpPost); + + try { + handleResponse(response); + return new AzkabanClientStatus.SUCCESS(); + } finally { + response.close(); + } + } catch (Exception e) { + return new AzkabanClientStatus.FAIL("Azkaban client cannot upload zip to project = " + + projectName, e); + } + } + + /** + * Execute a flow by providing flow parameters and options. The project and flow should be created first. + * + * @param projectName project name + * @param flowName flow name + * @param flowOptions flow options + * @param flowParameters flow parameters + * + * @return The status object which contains success status and execution id. + */ + public AzkabanExecuteFlowStatus executeFlowWithOptions( + String projectName, + String flowName, + Map<String, String> flowOptions, + Map<String, String> flowParameters) { + + try { + refreshSession(); + HttpPost httpPost = new HttpPost(this.url + "/executor"); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "executeFlow")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.FLOW, flowName)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.CONCURRENT_OPTION, "ignore")); + + addFlowOptions(nvps, flowOptions); + addFlowParameters(nvps, flowParameters); + + httpPost.setEntity(new UrlEncodedFormEntity(nvps)); + + Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); + Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); + httpPost.setHeaders(new Header[]{contentType, requestType}); + + CloseableHttpResponse response = this.client.execute(httpPost); + + try { + Map<String, String> map = handleResponse(response); + return new AzkabanExecuteFlowStatus( + new AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID))); + } finally { + response.close(); + } + } catch (Exception e) { + return new AzkabanExecuteFlowStatus("Azkaban client cannot execute flow = " + + flowName, e); + } + } + + /** + * Execute a flow with flow parameters. The project and flow should be created first. + * + * @param projectName project name + * @param flowName flow name + * @param flowParameters flow parameters + * + * @return The status object which contains success status and execution id. + */ + public AzkabanExecuteFlowStatus executeFlow( + String projectName, + String flowName, + Map<String, String> flowParameters) { + return executeFlowWithOptions(projectName, flowName, null, flowParameters); + } + + /** + * Given an execution id, fetches all the detailed information of that execution, including a list of all the job executions. + * + * @param execId execution id to be fetched. + * + * @return The status object which contains success status and all the detailed information of that execution. + */ + public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) { + try { + refreshSession(); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchexecflow")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId)); + + Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); + Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); + + HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8")); + httpGet.setHeaders(new Header[]{contentType, requestType}); + + CloseableHttpResponse response = this.client.execute(httpGet); + try { + Map<String, String> map = handleResponse(response); + return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map)); + } finally { + response.close(); + } + } catch (Exception e) { + return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot " + + "fetch execId " + execId, e); + } + } + + private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> flowParams) { + if (flowParams != null) { + for (Map.Entry<String, String> entry : flowParams.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { + log.debug("New flow parameter added:" + key + "-->" + value); + nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value)); + } + } + } + } + + private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> flowOptions) { + if (flowOptions != null) { + for (Map.Entry<String, String> option : flowOptions.entrySet()) { + log.debug("New flow option added:" + option .getKey()+ "-->" + option.getValue()); + nvps.add(new BasicNameValuePair(option.getKey(), option.getValue())); + } + } + } + + @Override + public void close() + throws IOException { + this.client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java new file mode 100644 index 0000000..da94c03 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; + +/** + * Exception raised by {@link AzkabanClient}. + */ +public class AzkabanClientException extends IOException { + private static final long serialVersionUID = 11324144L; + + public AzkabanClientException(String message, Exception e) { + super(message, e); + } + + public AzkabanClientException(String message) { + super(message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java new file mode 100644 index 0000000..27bb4f2 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * A collection of attributes used by {@link AzkabanClient} to form an HTTP request, + * and parse the HTTP response. More details can be found at + * {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api} + */ +public class AzkabanClientParams { + public static final String ACTION = "action"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String SESSION_ID = "session.id"; + + public static final String NAME = "name"; + public static final String DELETE = "delete"; + public static final String DESCRIPTION = "description"; + public static final String PROJECT = "project"; + public static final String FLOW = "flow"; + public static final String AJAX = "ajax"; + public static final String CONCURRENT_OPTION = "concurrentOption"; + public static final String MESSAGE = "message"; + public static final String STATUS = "status"; + public static final String ERROR = "error"; + public static final String EXECID = "execid"; + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java new file mode 100644 index 0000000..132f4ad --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import lombok.Getter; + +@Getter +public abstract class AzkabanClientStatus<RS> { + private boolean success = true; + private String failMsg = ""; + private Throwable throwable = null; + + private RS response = null; + + public AzkabanClientStatus() { + } + + public AzkabanClientStatus(RS response) { + this.response = response; + } + + public AzkabanClientStatus(String failMsg, Throwable throwable) { + this.success = false; + this.failMsg = failMsg; + this.throwable = throwable; + } + + /** + * This status captures basic success. + */ + public static class SUCCESS extends AzkabanClientStatus<Object> { + public SUCCESS() { + super(); + } + } + + /** + * This status captures basic failure (fail message and throwable). + */ + public static class FAIL extends AzkabanClientStatus<Object> { + public FAIL(String failMsg, Throwable throwable) { + super(failMsg, throwable); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java new file mode 100644 index 0000000..be9af89 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +/** + * This status captures execution id returned by {@link AzkabanClient#executeFlowWithOptions} + */ +public class AzkabanExecuteFlowStatus extends AzkabanClientStatus<AzkabanExecuteFlowStatus.ExecuteId> { + public AzkabanExecuteFlowStatus(ExecuteId executeId) { + super(executeId); + } + + public AzkabanExecuteFlowStatus(String failMsg, Throwable throwable) { + super(failMsg, throwable); + } + + @Getter + @AllArgsConstructor + public static class ExecuteId { + String execId; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java new file mode 100644 index 0000000..1e0afe8 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +/** + * This status captures execution details returned by {@link AzkabanClient#fetchFlowExecution(String)} + * + * The execution details are captured by {@link Execution} + */ +public class AzkabanFetchExecuteFlowStatus extends AzkabanClientStatus<AzkabanFetchExecuteFlowStatus.Execution> { + public AzkabanFetchExecuteFlowStatus(AzkabanFetchExecuteFlowStatus.Execution exec) { + super(exec); + } + + public AzkabanFetchExecuteFlowStatus(String failMsg, Throwable throwable) { + super(failMsg, throwable); + } + + @Getter + @AllArgsConstructor + public static class Execution { + Map<String, String> map; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java new file mode 100644 index 0000000..9e86b25 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.Map; +import java.util.Properties; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; + + +/** + * This test is disabled by default because it assumes the Azkaban-solo-server is setup on localhost:8081. + * + * Please check https://azkaban.github.io/azkaban/docs/latest/ for how to setup Azkaban-solo-server. + */ +@Slf4j +public class AzkabanClientTest { + private AzkabanClient client = null; + private FileSystem fs = null; + private long sessionExpireInMin = 1; + @BeforeClass + public void setup() throws Exception { + Config azkConfig = ConfigFactory.load("local-azkaban-service.conf"); + String userName = azkConfig.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY); + String password = azkConfig.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY); + String url = azkConfig.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY); + this.client = AzkabanClient.builder() + .username(userName) + .password(password) + .url(url) + .sessionExpireInMin(sessionExpireInMin) + .build(); + String uri = ConfigurationKeys.LOCAL_FS_URI; + this.fs = FileSystem.get(URI.create(uri), new Configuration()); + } + + @AfterClass + public void cleanup() throws IOException { + this.client.close(); + } + + private void ensureProjectExist(String projectName, String description) { + AzkabanClientStatus status; + // make sure it is in a clean state + status = this.client.deleteProject(projectName); + Assert.assertTrue(status.isSuccess()); + + // make sure the project is created successfully + status = this.client.createProject(projectName, description); + Assert.assertTrue(status.isSuccess()); + } + + @Test(enabled = false) + public void testCreateProject() { + String projectName = "project-create"; + String description = "This is a create project test."; + AzkabanClientStatus status; + + ensureProjectExist(projectName, description); + + // the second time creation should fail + status = this.client.createProject(projectName, description); + Assert.assertFalse(status.isSuccess()); + } + + @Test(enabled = false) + public void testDeleteProject() { + String projectName = "project-delete"; + String description = "This is a delete project test."; + AzkabanClientStatus status; + + ensureProjectExist(projectName, description); + + // delete the new project + status = this.client.deleteProject(projectName); + Assert.assertTrue(status.isSuccess()); + } + + @Test(enabled = false) + public void testUploadZip() throws IOException { + String projectName = "project-upload"; + String description = "This is a upload project test."; + String flowName = "test-upload"; + AzkabanClientStatus status; + + ensureProjectExist(projectName, description); + + // upload Zip to project + File zipFile = createAzkabanZip(flowName); + status = this.client.uploadProjectZip(projectName, zipFile); + Assert.assertTrue(status.isSuccess()); + + // upload Zip to an non-existed project + status = this.client.uploadProjectZip("Non-existed-project", zipFile); + Assert.assertFalse(status.isSuccess()); + } + + @Test(enabled = false) + public void testExecuteFlow() throws IOException { + String projectName = "project-execFlow"; + String description = "This is a flow execution test."; + String flowName = "test-exec-flow"; + + ensureProjectExist(projectName, description); + + // upload Zip to project + File zipFile = createAzkabanZip(flowName); + AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile); + Assert.assertTrue(status.isSuccess()); + + // execute a flow + AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, flowName, Maps.newHashMap()); + Assert.assertTrue(execStatus.isSuccess()); + log.info("Execid: {}", execStatus.getResponse().execId); + } + + @Test(enabled = false) + public void testExecuteFlowWithParams() throws IOException { + String projectName = "project-execFlow-Param"; + String description = "This is a flow execution test."; + String flowName = "test-exec-flow-param"; + + ensureProjectExist(projectName, description); + + // upload Zip to project + File zipFile = createAzkabanZip(flowName); + AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile); + Assert.assertTrue(status.isSuccess()); + + Map<String, String> flowParams = Maps.newHashMap(); + flowParams.put("gobblin.source", "DummySource"); + flowParams.put("gobblin.dataset.pattern", "/data/tracking/MessageActionEvent/hourly/*/*/*/*"); + + // execute a flow + AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, flowName, flowParams); + Assert.assertTrue(execStatus.isSuccess()); + log.info("Execid: {}", execStatus.getResponse().execId); + } + + @Test(enabled = false) + public void testExecuteFlowWithOptions() throws IOException { + String projectName = "project-execFlow-Option"; + String description = "This is a flow execution test."; + String flowName = "test-exec-flow-options"; + + ensureProjectExist(projectName, description); + + // upload Zip to project + File zipFile = createAzkabanZip(flowName); + AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile); + Assert.assertTrue(status.isSuccess()); + + Map<String, String> flowOptions = Maps.newHashMap(); + + // execute a flow + AzkabanExecuteFlowStatus execStatus = this.client.executeFlowWithOptions(projectName, flowName, flowOptions, Maps.newHashMap()); + Assert.assertTrue(execStatus.isSuccess()); + log.info("Execid: {}", execStatus.getResponse().execId); + } + + @Test(enabled = false) + public void testFetchFlowExecution() throws Exception { + String projectName = "project-fetch-flow-exec"; + String description = "This is a flow execution fetch test."; + String flowName = "test-fetch-flow-executions"; + + ensureProjectExist(projectName, description); + + // upload Zip to project + File zipFile = createAzkabanZip(flowName); + AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile); + Assert.assertTrue(status.isSuccess()); + + Map<String, String> flowOptions = Maps.newHashMap(); + + // execute a flow + AzkabanExecuteFlowStatus execStatus = this.client.executeFlowWithOptions(projectName, flowName, flowOptions, Maps.newHashMap()); + Assert.assertTrue(execStatus.isSuccess()); + log.info("Execid: {}", execStatus.getResponse().execId); + + // wait for the job started and failed + Thread.sleep(3000); + + // job should fail + AzkabanFetchExecuteFlowStatus fetchExecuteFlowStatus = this.client.fetchFlowExecution(execStatus.getResponse().execId); + Assert.assertTrue(fetchExecuteFlowStatus.isSuccess()); + } + + @Test(enabled = false) + public void testSessionExpiration() throws Exception { + String projectName = "project-session-expiration-test"; + String description = "This is a session expiration test."; + Thread.sleep(sessionExpireInMin * 60 * 1000); + ensureProjectExist(projectName, description); + } + + private File createAzkabanZip(String flowName) throws IOException { + Properties jobProps = new Properties(); + jobProps.load(this.getClass().getClassLoader(). + getResourceAsStream("azkakaban-job-basic.properties")); + + String basePath = "/tmp/testAzkabanZip"; + this.fs.delete(new Path(basePath), true); + + // create testAzkabanZip/test dir + File jobDir = new File(basePath, flowName); + Assert.assertTrue(jobDir.mkdirs()); + + // create testAzkabanZip/test/test.job + File jobFile = new File(jobDir,flowName + ".job"); + OutputStream jobOut = new FileOutputStream(jobFile); + jobProps.store(jobOut, "Writing a test job file."); + + // create testAzkabanZip/test.zip + FileOutputStream fos = new FileOutputStream(jobDir.getPath() + ".zip"); + ZipOutputStream zos = new ZipOutputStream(fos); + addDirToZipArchive(zos, jobDir, null); + zos.close(); + fos.close(); + return new File(jobDir.getPath() + ".zip"); + } + + private static void addDirToZipArchive(ZipOutputStream zos, File fileToZip, String parentDirectoryName) throws IOException { + if (fileToZip == null || !fileToZip.exists()) { + return; + } + + String zipEntryName = fileToZip.getName(); + if (parentDirectoryName!=null && !parentDirectoryName.isEmpty()) { + zipEntryName = parentDirectoryName + "/" + fileToZip.getName(); + } + + if (fileToZip.isDirectory()) { + for (File file : fileToZip.listFiles()) { + addDirToZipArchive(zos, file, zipEntryName); + } + } else { + byte[] buffer = new byte[1024]; + FileInputStream fis = new FileInputStream(fileToZip); + zos.putNextEntry(new ZipEntry(zipEntryName)); + int length; + while ((length = fis.read(buffer)) > 0) { + zos.write(buffer, 0, length); + } + zos.closeEntry(); + fis.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties b/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties new file mode 100644 index 0000000..4a2ba83 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/test/resources/azkakaban-job-basic.properties @@ -0,0 +1,7 @@ +type=hadoopJava +job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher + +# Azkaban properties +job.name=Test +job.group=TestGroup +job.description=A test job \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/78da23b1/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf b/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf new file mode 100644 index 0000000..2aa6e6c --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/test/resources/local-azkaban-service.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default values +# These values should generally come from template being used by the Service + +# Default config for Azkaban Infrastructure +# (Current defaults are for default local Azkaban setup) +gobblin.service.azkaban.username=azkaban +gobblin.service.azkaban.password=azkaban +gobblin.service.azkaban.server.url="http://localhost:8081/" \ No newline at end of file
