Repository: incubator-gobblin Updated Branches: refs/heads/master f43de8c4d -> 9acb2b257
[GOBBLIN-591] Allow user to pass in a new http client to AzkabanClient Closes #2458 from yukuai518/azzz Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9acb2b25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9acb2b25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9acb2b25 Branch: refs/heads/master Commit: 9acb2b25744498282131bf71499ecf96920f9fbd Parents: f43de8c Author: Kuai Yu <[email protected]> Authored: Tue Sep 18 13:06:00 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Sep 18 13:06:00 2018 -0700 ---------------------------------------------------------------------- .../modules/orchestration/AzkabanClient.java | 39 +++++++++++++------- .../orchestration/AzkabanClientTest.java | 11 +----- 2 files changed, 27 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9acb2b25/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java index 455e409..624e24a 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java @@ -66,9 +66,6 @@ import lombok.Builder; /** * A simple client that uses Ajax API to communicate with Azkaban server. * - * Lombok will not consider fields from the superclass in the generated builder class. For a workaround, we put - * @Builder in constructors to allow Builder inheritance. - * * @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/} * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api} */ @@ -80,7 +77,9 @@ public class AzkabanClient implements Closeable { protected String password; protected String sessionId; protected long sessionCreationTime = 0; - protected CloseableHttpClient client; + protected CloseableHttpClient httpClient; + + private boolean httpClientProvided = true; private static Logger log = LoggerFactory.getLogger(AzkabanClient.class); /** @@ -90,16 +89,26 @@ public class AzkabanClient implements Closeable { protected AzkabanClient(String username, String password, String url, - long sessionExpireInMin) + long sessionExpireInMin, + CloseableHttpClient httpClient) throws AzkabanClientException { this.username = username; this.password = password; this.url = url; this.sessionExpireInMin = sessionExpireInMin; - this.client = getClient(); + this.httpClient = httpClient; + + this.initializeClient(); this.initializeSession(); } + private void initializeClient() throws AzkabanClientException { + if (this.httpClient == null) { + this.httpClient = createHttpClient(); + this.httpClientProvided = false; + } + } + /** * Create a session id that can be used in the future to communicate with Azkaban server. */ @@ -111,7 +120,7 @@ public class AzkabanClient implements Closeable { nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username)); nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password)); httpPost.setEntity(new UrlEncodedFormEntity(nvps)); - CloseableHttpResponse response = this.client.execute(httpPost); + CloseableHttpResponse response = this.httpClient.execute(httpPost); try { HttpEntity entity = response.getEntity(); @@ -135,7 +144,7 @@ public class AzkabanClient implements Closeable { * * @return A closeable http client. */ - protected CloseableHttpClient getClient() throws AzkabanClientException { + private CloseableHttpClient createHttpClient() throws AzkabanClientException { try { // SSLSocketFactory using custom TrustStrategy that ignores warnings about untrusted certificates // Self sign SSL @@ -262,7 +271,7 @@ public class AzkabanClient implements Closeable { Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); httpPost.setHeaders(new Header[]{contentType, requestType}); - CloseableHttpResponse response = this.client.execute(httpPost); + CloseableHttpResponse response = this.httpClient.execute(httpPost); try { handleResponse(response); @@ -297,7 +306,7 @@ public class AzkabanClient implements Closeable { HttpGet httpGet = new HttpGet(url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8")); httpGet.setHeaders(new Header[]{contentType, requestType}); - CloseableHttpResponse response = this.client.execute(httpGet); + CloseableHttpResponse response = this.httpClient.execute(httpGet); response.close(); return new AzkabanClientStatus.SUCCESS(); @@ -331,7 +340,7 @@ public class AzkabanClient implements Closeable { .build(); httpPost.setEntity(entity); - CloseableHttpResponse response = this.client.execute(httpPost); + CloseableHttpResponse response = this.httpClient.execute(httpPost); try { handleResponse(response); @@ -380,7 +389,7 @@ public class AzkabanClient implements Closeable { Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); httpPost.setHeaders(new Header[]{contentType, requestType}); - CloseableHttpResponse response = this.client.execute(httpPost); + CloseableHttpResponse response = this.httpClient.execute(httpPost); try { Map<String, String> map = handleResponse(response); @@ -432,7 +441,7 @@ public class AzkabanClient implements Closeable { HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8")); httpGet.setHeaders(new Header[]{contentType, requestType}); - CloseableHttpResponse response = this.client.execute(httpGet); + CloseableHttpResponse response = this.httpClient.execute(httpGet); try { Map<String, String> map = handleResponse(response); return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map)); @@ -470,6 +479,8 @@ public class AzkabanClient implements Closeable { @Override public void close() throws IOException { - this.client.close(); + if (!httpClientProvided) { + this.httpClient.close(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9acb2b25/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java index 9e86b25..edefe3e 100644 --- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java +++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java @@ -23,15 +23,12 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.net.URI; import java.util.Map; import java.util.Properties; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -43,7 +40,6 @@ import com.typesafe.config.ConfigFactory; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.configuration.ConfigurationKeys; /** @@ -54,7 +50,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys; @Slf4j public class AzkabanClientTest { private AzkabanClient client = null; - private FileSystem fs = null; private long sessionExpireInMin = 1; @BeforeClass public void setup() throws Exception { @@ -68,8 +63,6 @@ public class AzkabanClientTest { .url(url) .sessionExpireInMin(sessionExpireInMin) .build(); - String uri = ConfigurationKeys.LOCAL_FS_URI; - this.fs = FileSystem.get(URI.create(uri), new Configuration()); } @AfterClass @@ -238,7 +231,7 @@ public class AzkabanClientTest { getResourceAsStream("azkakaban-job-basic.properties")); String basePath = "/tmp/testAzkabanZip"; - this.fs.delete(new Path(basePath), true); + FileUtils.deleteDirectory(new File(basePath)); // create testAzkabanZip/test dir File jobDir = new File(basePath, flowName);
