Repository: incubator-gobblin Updated Branches: refs/heads/master 754b06696 -> 0b1c52cd1
[GOBBLIN-664] Refactor Azkaban Client for session refresh. Closes #2535 from kyuamazon/session Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0b1c52cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0b1c52cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0b1c52cd Branch: refs/heads/master Commit: 0b1c52cd1dbba698498daacdce2060aa5eef5eb5 Parents: 754b066 Author: Kuai Yu <[email protected]> Authored: Mon Jan 14 11:36:52 2019 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Jan 14 11:36:52 2019 -0800 ---------------------------------------------------------------------- .../modules/orchestration/AzkabanClient.java | 65 +++++++-------- .../orchestration/AzkabanSessionManager.java | 49 ++++++++++++ .../modules/orchestration/SessionHelper.java | 84 ++++++++++++++++++++ .../modules/orchestration/SessionManager.java | 34 ++++++++ 4 files changed, 194 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java index cccbb5e..ec6bce4 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java @@ -64,22 +64,23 @@ import lombok.Builder; /** - * A simple client that uses Ajax API to communicate with Azkaban server. + * A simple http based client that uses Ajax API to communicate with Azkaban server. * - * @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/} - * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api} + * @see <a href="https://azkaban.github.io/azkaban/docs/latest/#ajax-api"> + * https://azkaban.github.io/azkaban/docs/latest/#ajax-api + * </a> */ public class AzkabanClient implements Closeable { protected final String username; protected final String url; protected final long sessionExpireInMin; // default value is 12h. - + protected SessionManager sessionManager; protected String password; protected String sessionId; protected long sessionCreationTime = 0; protected CloseableHttpClient httpClient; - private boolean httpClientProvided = true; + private boolean customHttpClientProvided = true; private static Logger log = LoggerFactory.getLogger(AzkabanClient.class); /** @@ -90,51 +91,35 @@ public class AzkabanClient implements Closeable { String password, String url, long sessionExpireInMin, - CloseableHttpClient httpClient) + CloseableHttpClient httpClient, + SessionManager sessionManager) throws AzkabanClientException { this.username = username; this.password = password; this.url = url; this.sessionExpireInMin = sessionExpireInMin; this.httpClient = httpClient; - + this.sessionManager = sessionManager; this.initializeClient(); - this.initializeSession(); + this.initializeSessionManager(); + + this.sessionId = this.sessionManager.fetchSession(); + this.sessionCreationTime = System.nanoTime(); } private void initializeClient() throws AzkabanClientException { if (this.httpClient == null) { this.httpClient = createHttpClient(); - this.httpClientProvided = false; + this.customHttpClientProvided = false; } } - /** - * Create a session id that can be used in the future to communicate with Azkaban server. - */ - protected void initializeSession() throws AzkabanClientException { - try { - HttpPost httpPost = new HttpPost(this.url); - List<NameValuePair> nvps = new ArrayList<>(); - nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login")); - nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username)); - nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password)); - httpPost.setEntity(new UrlEncodedFormEntity(nvps)); - CloseableHttpResponse response = this.httpClient.execute(httpPost); - - try { - HttpEntity entity = response.getEntity(); - - // retrieve session id from entity - String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8"); - this.sessionId = parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID); - EntityUtils.consume(entity); - } finally { - response.close(); - } - this.sessionCreationTime = System.nanoTime(); - } catch (Exception e) { - throw new AzkabanClientException("Azkaban client cannot initialize session.", e); + private void initializeSessionManager() throws AzkabanClientException { + if (sessionManager == null) { + this.sessionManager = new AzkabanSessionManager(this.httpClient, + this.url, + this.username, + this.password); } } @@ -171,11 +156,15 @@ public class AzkabanClient implements Closeable { } } + /** + * When current session expired, use {@link SessionManager} to refresh the session id. + */ private void refreshSession() throws AzkabanClientException { Preconditions.checkArgument(this.sessionCreationTime != 0); if ((System.nanoTime() - this.sessionCreationTime) > Duration.ofMinutes(this.sessionExpireInMin).toNanos()) { log.info("Session expired. Generating a new session."); - this.initializeSession(); + this.sessionId = this.sessionManager.fetchSession(); + this.sessionCreationTime = System.nanoTime(); } } @@ -213,7 +202,7 @@ public class AzkabanClient implements Closeable { return AzkabanClient.parseResponse(jsonResponseString); } - private static Map<String, String> parseResponse(String jsonResponseString) throws IOException { + static Map<String, String> parseResponse(String jsonResponseString) throws IOException { // Parse Json Map<String, String> responseMap = new HashMap<>(); if (StringUtils.isNotBlank(jsonResponseString)) { @@ -509,7 +498,7 @@ public class AzkabanClient implements Closeable { @Override public void close() throws IOException { - if (!httpClientProvided) { + if (!customHttpClientProvided) { this.httpClient.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java new file mode 100644 index 0000000..3a44395 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.http.impl.client.CloseableHttpClient; + +/** + * A {@link SessionManager} that implements session refreshing logic + * used by {@link AzkabanClient}. + */ +public class AzkabanSessionManager implements SessionManager { + private CloseableHttpClient httpClient; + private String url; + private String username; + private String password; + + public AzkabanSessionManager(CloseableHttpClient httpClient, + String url, + String username, + String password) { + this.httpClient = httpClient; + this.username = username; + this.password = password; + this.url = url; + } + + /** + * Fetch a session id that can be used in the future to communicate with Azkaban server. + * @return session id + */ + public String fetchSession() throws AzkabanClientException { + return SessionHelper.getSessionId(this.httpClient, this.url, this.username, this.password); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java new file mode 100644 index 0000000..f491ad9 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; + +/** + * A helper class which can get session id using Azkaban authentication mechanism. + * + * @see <a href="https://azkaban.github.io/azkaban/docs/latest/#api-authenticate"> + * https://azkaban.github.io/azkaban/docs/latest/#api-authenticate + * </a> + */ +public class SessionHelper { + + /** + * <p>Use Azkaban ajax api to fetch the session id. Required http request parameters are: + * <br>action=login The fixed parameter indicating the login action. + * <br>username The Azkaban username. + * <br>password The corresponding password. + * </pr> + * + * @param httpClient An apache http client + * @param url Azkaban ajax endpoint + * @param username username for Azkaban login + * @param password password for Azkaban login + * + * @return session id + */ + public static String getSessionId(CloseableHttpClient httpClient, String url, String username, String password) + throws AzkabanClientException { + try { + HttpPost httpPost = new HttpPost(url); + List<NameValuePair> nvps = new ArrayList<>(); + nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login")); + nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, username)); + nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, password)); + httpPost.setEntity(new UrlEncodedFormEntity(nvps)); + CloseableHttpResponse response = httpClient.execute(httpPost); + + try { + HttpEntity entity = response.getEntity(); + + // retrieve session id from entity + String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8"); + String sessionId = AzkabanClient.parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID); + EntityUtils.consume(entity); + return sessionId; + } catch (Exception e) { + throw new AzkabanClientException("Azkaban client cannot consume session response.", e); + } finally { + response.close(); + } + } catch (Exception e) { + throw new AzkabanClientException("Azkaban client cannot fetch session.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java new file mode 100644 index 0000000..b999fdf --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Implements a session manager to refresh the session id. + * + * {@link AzkabanClient} needs this class to periodically refresh + * the seesion id when current session was expired. Please refer + * to {@link AzkabanClient#refreshSession}. + */ +public interface SessionManager { + + /** + * Get session id using Azkaban authentication mechanism. + * @return session id + */ + String fetchSession() throws AzkabanClientException; +}
