This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new a7458ad [GOBBLIN-1130] Add API for adding proxy user to azkaban project a7458ad is described below commit a7458adbdb5dcb8947329b8959b1522a29aaef86 Author: Jack Moseley <jmose...@linkedin.com> AuthorDate: Wed Apr 29 23:39:30 2020 -0700 [GOBBLIN-1130] Add API for adding proxy user to azkaban project Closes #2971 from jack-moseley/add-proxy --- .../modules/orchestration/AzkabanClient.java | 18 ++++++++ .../orchestration/AzkabanMultiCallables.java | 54 +++++++++++++++++++--- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java index ccbc300..0e8ee48 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java @@ -422,6 +422,24 @@ public class AzkabanClient implements Closeable { return runWithRetry(callable, AzkabanFetchExecuteFlowStatus.class); } + /** + * Given a project and user, add that user as a proxy user in the project. + * + * @param projectName project name + * @param proxyUserName proxy user + * + * @return A status object indicating if AJAX request is successful. + */ + public AzkabanClientStatus addProxyUser(String projectName, String proxyUserName) throws AzkabanClientException { + AzkabanMultiCallables.AddProxyUserCallable callable = AzkabanMultiCallables.AddProxyUserCallable.builder() + .client(this) + .projectName(projectName) + .proxyUserName(proxyUserName) + .build(); + + return runWithRetry(callable, AzkabanClientStatus.class); + } + private <T> T runWithRetry(Callable callable, Class<T> cls) throws AzkabanClientException { try { AzkabanClientStatus status = this.retryer.call(callable); diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java index 9091220..601b4cb 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java @@ -90,7 +90,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpPost); closer.register(response); - client.handleResponse(response); + AzkabanClient.handleResponse(response); return new AzkabanSuccess(); } catch (InvalidSessionException e) { this.invalidSession = true; @@ -130,7 +130,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpGet); closer.register(response); - client.handleResponse(response); + AzkabanClient.handleResponse(response); return new AzkabanSuccess(); } catch (InvalidSessionException e) { this.invalidSession = true; @@ -170,7 +170,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpPost); closer.register(response); - client.handleResponse(response); + AzkabanClient.handleResponse(response); return new AzkabanSuccess(); } catch (InvalidSessionException e) { this.invalidSession = true; @@ -218,7 +218,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpPost); closer.register(response); - Map<String, String> map = client.handleResponse(response); + Map<String, String> map = AzkabanClient.handleResponse(response); return new AzkabanExecuteFlowStatus( new AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID))); } catch (InvalidSessionException e) { @@ -278,7 +278,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpGet); closer.register(response); - client.handleResponse(response); + AzkabanClient.handleResponse(response); return new AzkabanSuccess(); } catch (InvalidSessionException e) { this.invalidSession = true; @@ -317,7 +317,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpGet); closer.register(response); - Map<String, String> map = client.handleResponse(response); + Map<String, String> map = AzkabanClient.handleResponse(response); return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map)); } catch (InvalidSessionException e) { this.invalidSession = true; @@ -363,7 +363,7 @@ class AzkabanMultiCallables { CloseableHttpResponse response = client.httpClient.execute(httpGet); closer.register(response); - Map<String, String> map = client.handleResponse(response); + Map<String, String> map = AzkabanClient.handleResponse(response); FileUtils.writeStringToFile(output, map.get(AzkabanClientParams.DATA), Charsets.UTF_8); return new AzkabanSuccess(); } catch (InvalidSessionException e) { @@ -375,4 +375,44 @@ class AzkabanMultiCallables { } } } + + /** + * A callable that will add a proxy user to a project on Azkaban + */ + @Builder + static class AddProxyUserCallable implements Callable<AzkabanClientStatus> { + private AzkabanClient client; + private String projectName; + private String proxyUserName; + private boolean invalidSession = false; + + @Override + public AzkabanClientStatus call() + throws AzkabanClientException { + try (Closer closer = Closer.create()) { + client.refreshSession(this.invalidSession); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "addProxyUser")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, proxyUserName)); + + Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); + Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest"); + + HttpGet httpGet = new HttpGet(client.url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8")); + httpGet.setHeaders(new Header[]{contentType, requestType}); + + CloseableHttpResponse response = client.httpClient.execute(httpGet); + closer.register(response); + AzkabanClient.handleResponse(response); + return new AzkabanSuccess(); + } catch (InvalidSessionException e) { + this.invalidSession = true; + throw e; + } catch (Exception e) { + throw new AzkabanClientException("Azkaban client cannot add proxy user " + proxyUserName, e); + } + } + } }