Updated Branches: refs/heads/master 67e54f610 -> 2a2e2139b
OOZIE-1609 HA support for share lib. (puru via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2a2e2139 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2a2e2139 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2a2e2139 Branch: refs/heads/master Commit: 2a2e2139b58ad1bec882870965b125fbdbf9123f Parents: 67e54f6 Author: Robert Kanter <[email protected]> Authored: Wed Jan 29 14:14:33 2014 -0800 Committer: Robert Kanter <[email protected]> Committed: Wed Jan 29 14:14:33 2014 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/client/OozieClient.java | 27 +++-- .../apache/oozie/client/rest/RestConstants.java | 2 +- .../oozie/service/JobsConcurrencyService.java | 21 ++++ .../apache/oozie/service/ShareLibService.java | 8 +- .../oozie/service/ZKJobsConcurrencyService.java | 37 ++++++ .../oozie/service/ZKXLogStreamingService.java | 112 ++----------------- .../apache/oozie/servlet/BaseAdminServlet.java | 101 +++++++---------- .../service/TestJobsConcurrencyService.java | 23 ++++ .../service/TestZKJobsConcurrencyService.java | 22 ++++ .../service/TestZKXLogStreamingService.java | 4 +- docs/src/site/twiki/DG_CommandLineTool.twiki | 34 ++++-- docs/src/site/twiki/WebServicesAPI.twiki | 36 ++++-- release-log.txt | 1 + 13 files changed, 237 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index 9d6c9e0..b0a85fd 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -1521,7 +1521,8 @@ public class OozieClient { private class UpdateSharelib extends ClientCallable<String> { UpdateSharelib() { - super("GET", RestConstants.ADMIN, RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams()); + super("GET", RestConstants.ADMIN, RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams( + RestConstants.ALL_SERVER_REQUEST, "true")); } @Override @@ -1529,15 +1530,27 @@ public class OozieClient { StringBuffer bf = new StringBuffer(); if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { Reader reader = new InputStreamReader(conn.getInputStream()); - JSONObject json = (JSONObject) ((JSONObject) JSONValue.parse(reader)) - .get(JsonTags.SHARELIB_LIB_UPDATE); + Object sharelib = (Object) JSONValue.parse(reader); bf.append("[ShareLib update status]").append(System.getProperty("line.separator")); - for (Object key : json.keySet()) { - bf.append(" ").append(key).append(" = ").append(json.get(key)) + if (sharelib instanceof JSONArray) { + for (Object o : ((JSONArray) sharelib)) { + JSONObject obj = (JSONObject) ((JSONObject) o).get(JsonTags.SHARELIB_LIB_UPDATE); + for (Object key : obj.keySet()) { + bf.append("\t").append(key).append(" = ").append(obj.get(key)) + .append(System.getProperty("line.separator")); + } + bf.append(System.getProperty("line.separator")); + } + return bf.toString(); + } + else{ + JSONObject obj = (JSONObject) ((JSONObject) sharelib).get(JsonTags.SHARELIB_LIB_UPDATE); + for (Object key : obj.keySet()) { + bf.append("\t").append(key).append(" = ").append(obj.get(key)) .append(System.getProperty("line.separator")); + } + bf.append(System.getProperty("line.separator")); } - - return bf.toString(); } else { handleError(conn); http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java index f65239d..0466ffe 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java +++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java @@ -166,6 +166,6 @@ public interface RestConstants { public static final String SHARE_LIB_REQUEST_KEY = "lib"; - public static final String SHARE_LIB_ALLSERVER_REQUEST = "allservers"; + public static final String ALL_SERVER_REQUEST = "allservers"; } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java index 99c16e0..27c97e6 100644 --- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java @@ -113,4 +113,25 @@ public class JobsConcurrencyService implements Service, Instrumentable { public Map<String, String> getServerUrls() { return urls; } + + /** + * Return a map of instance id to other Oozie servers URL in HA. This implementation always returns a empty map. + * + * @return A map of Oozie instance ids and URLs + * @throws Exception + */ + public Map<String, String> getOtherServerUrls() { + return new HashMap<String, String>(); + } + + /** + * Checks if rest request is for all server. This function always return + * false; + * + * @param params the HttpRequest param + * @return false. + */ + public boolean isAllServerRequest(Map<String, String[]> params) { + return false; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/ShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java index 9556620..791d1a9 100644 --- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java +++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TimeZone; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -103,8 +104,11 @@ public class ShareLibService implements Service, Instrumentable { fs = FileSystem.get(has.createJobConf(uri.getAuthority())); updateLauncherLib(); updateShareLib(); - purgeLibs(fs, LAUNCHER_PREFIX); - purgeLibs(fs, SHARED_LIB_PREFIX); + //Only one server should purge sharelib + if (Services.get().get(JobsConcurrencyService.class).isFirstServer()) { + purgeLibs(fs, LAUNCHER_PREFIX); + purgeLibs(fs, SHARED_LIB_PREFIX); + } } catch (Exception e) { LOG.error("Not able to cache shareLib. Admin need to issue oozlie cli command to update sharelib.", e); http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java index 42fce05..611b74c 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java @@ -25,6 +25,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.ZKUtils; @@ -170,4 +171,40 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements } return urls; } + + /** + * Return a map of instance id to Oozie server URL of other servers. This implementation always returns a map with + * where the key is the instance id and the value is the URL of each Oozie server that we can see in the service + * discovery in ZooKeeper. + * + * @return A map of Oozie instance ids and URLs + */ + @Override + public Map<String, String> getOtherServerUrls() { + Map<String, String> urls = new HashMap<String, String>(); + List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); + for (ServiceInstance<Map> oozie : oozies) { + Map<String, String> metadata = oozie.getPayload(); + String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); + + if (id.equals(zk.getZKId())) { + continue; + } + String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); + urls.put(id, url); + } + return urls; + } + + /** + * Checks if rest request is for all server. By default it's true. + * + * @param params the HttpRequest param + * @return false if allservers=false, else true; + */ + @Override + public boolean isAllServerRequest(Map<String, String[]> params) { + return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty() + || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java index c17a8aa..8dc8b4b 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java @@ -18,31 +18,24 @@ package org.apache.oozie.service; import java.io.BufferedReader; + import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.XLogStreamer; + import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.Writer; -import java.net.HttpURLConnection; -import java.net.URL; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.TreeMap; + import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.authentication.client.Authenticator; -import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; -import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.util.AuthUrlClient; import org.apache.oozie.util.SimpleTimestampedMessageParser; import org.apache.oozie.util.TimestampedMessageParser; import org.apache.oozie.util.XLog; @@ -54,11 +47,8 @@ import org.apache.oozie.util.ZKUtils; */ public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable { - private static final String ALL_SERVERS_PARAM = "allservers"; - private ZKUtils zk; private XLog log; - private Class<? extends Authenticator> AuthenticatorClass; /** * Initialize the log streaming service. @@ -76,11 +66,6 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); } log = XLog.getLog(this.getClass()); - try { - AuthenticatorClass = determineAuthenticatorClassType(); - } catch (Exception ex) { - throw new ServiceException(ErrorCode.E0100, ex); - } } /** @@ -121,8 +106,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv XLogService xLogService = Services.get().get(XLogService.class); if (xLogService.getLogOverWS()) { // If ALL_SERVERS_PARAM is set to false, then only stream our log - if (params.get(ALL_SERVERS_PARAM) != null && params.get(ALL_SERVERS_PARAM).length > 0 - && params.get(ALL_SERVERS_PARAM)[0].equals("false")) { + if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); } @@ -175,7 +159,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB); try { - BufferedReader reader = fetchOtherLog(otherUrl, jobId); + // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie + // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) + final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB + + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG + + "&" + RestConstants.ALL_SERVER_REQUEST + "=false"; + + BufferedReader reader = AuthUrlClient.callServer(url); parsers.add(new SimpleTimestampedMessageParser(reader, filter)); } catch(IOException ioe) { @@ -247,82 +237,4 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv writer.flush(); } } - - /** - * Creates a connection over HTTP to another Oozie server and asks it for the logs related to the jobId. - * - * @param otherUrl The URL of the other Oozie server - * @param jobId The job id of the job we want logs for - * @return a BufferedReader of the logs from the other server for the jobId - * @throws IOException If there was a problem connecting to the other Oozie server - */ - private BufferedReader fetchOtherLog(String otherUrl, String jobId) throws IOException { - // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie Server from trying - // aggregate logs from the other Oozie servers (and creating an infinite recursion) - final URL url = new URL(otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB + "/" + jobId - + "?" + RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG + "&" + ALL_SERVERS_PARAM + "=false"); - - log.debug("Fetching logs from [{0}]", url); - BufferedReader reader = null; - try { - reader = UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<BufferedReader>() { - @Override - public BufferedReader run() throws IOException { - HttpURLConnection conn = getConnection(url); - BufferedReader reader = null; - if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { - InputStream is = conn.getInputStream(); - reader = new BufferedReader(new InputStreamReader(is)); - } - return reader; - } - }); - } - catch (InterruptedException ie) { - throw new IOException(ie); - } - return reader; - } - - private HttpURLConnection getConnection(URL url) throws IOException { - AuthenticatedURL.Token token = new AuthenticatedURL.Token(); - HttpURLConnection conn; - try { - conn = new AuthenticatedURL(AuthenticatorClass.newInstance()).openConnection(url, token); - } - catch (AuthenticationException ex) { - throw new IOException("Could not authenticate, " + ex.getMessage(), ex); - } catch (InstantiationException ex) { - throw new IOException("Could not authenticate, " + ex.getMessage(), ex); - } catch (IllegalAccessException ex) { - throw new IOException("Could not authenticate, " + ex.getMessage(), ex); - } - if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { - throw new IOException("Unexpected response code [" + conn.getResponseCode() + "], message [" + conn.getResponseMessage() - + "]"); - } - return conn; - } - - @SuppressWarnings("unchecked") - private Class<? extends Authenticator> determineAuthenticatorClassType() throws Exception { - // Adapted from org.apache.hadoop.security.authentication.server.AuthenticationFilter#init - Class<? extends Authenticator> authClass; - String authName = Services.get().getConf().get("oozie.authentication.type"); - String authClassName; - if (authName == null) { - throw new IOException("Authentication type must be specified: simple|kerberos|<class>"); - } - authName = authName.trim(); - if (authName.equals("simple")) { - authClassName = PseudoAuthenticator.class.getName(); - } else if (authName.equals("kerberos")) { - authClassName = KerberosAuthenticator.class.getName(); - } else { - authClassName = authName; - } - - authClass = (Class<? extends Authenticator>) Thread.currentThread().getContextClassLoader().loadClass(authClassName); - return authClass; - } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java index 091070f..29d7bd6 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java @@ -18,6 +18,7 @@ package org.apache.oozie.servlet; import java.io.IOException; +import java.io.Reader; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -33,11 +34,14 @@ import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.service.AuthorizationException; import org.apache.oozie.service.AuthorizationService; import org.apache.oozie.service.InstrumentationService; +import org.apache.oozie.service.JobsConcurrencyService; import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; +import org.apache.oozie.util.AuthUrlClient; import org.apache.oozie.util.Instrumentation; import org.json.simple.JSONArray; import org.json.simple.JSONObject; +import org.json.simple.JSONValue; public abstract class BaseAdminServlet extends JsonRestServlet { @@ -206,7 +210,7 @@ public abstract class BaseAdminServlet extends JsonRestServlet { } /** - * Update share lib. + * Update share lib. support HA * * @param request the request * @param response the response @@ -214,42 +218,45 @@ public abstract class BaseAdminServlet extends JsonRestServlet { */ @SuppressWarnings("unchecked") public void updateShareLib(HttpServletRequest request, HttpServletResponse response) throws IOException { + JSONArray jsonArray = new JSONArray(); + JobsConcurrencyService jc = Services.get().get(JobsConcurrencyService.class); + if (jc.isAllServerRequest(request.getParameterMap())) { + Map<String, String> servers = jc.getOtherServerUrls(); + for (String otherUrl : servers.values()) { + // It's important that we specify ALL_SERVERS_PARAM=false, so that other oozie server should not call other oozie + //servers to update sharelib (and creating an infinite recursion) + String serverUrl = otherUrl + "/v2/admin/" + RestConstants.ADMIN_UPDATE_SHARELIB + "?" + + RestConstants.ALL_SERVER_REQUEST + "=false"; + try { + Reader reader = AuthUrlClient.callServer(serverUrl); + JSONObject json = (JSONObject) JSONValue.parse(reader); + jsonArray.add(json); + } + catch (Exception e) { + JSONObject errorJson = new JSONObject(); + errorJson.put(JsonTags.SHARELIB_UPDATE_HOST, otherUrl); + errorJson.put(JsonTags.SHARELIB_UPDATE_STATUS, e.getMessage()); + JSONObject newJson = new JSONObject(); + newJson.put(JsonTags.SHARELIB_LIB_UPDATE, errorJson); + jsonArray.add(newJson); + } + } + //For current server + JSONObject newJson = new JSONObject(); + newJson.put(JsonTags.SHARELIB_LIB_UPDATE, updateLocalShareLib(request)); + jsonArray.add(newJson); + sendJsonResponse(response, HttpServletResponse.SC_OK, jsonArray); + } + else { + JSONObject newJson = new JSONObject(); + newJson.put(JsonTags.SHARELIB_LIB_UPDATE, updateLocalShareLib(request)); + sendJsonResponse(response, HttpServletResponse.SC_OK, newJson); + } + } - //TODO sharelib HA. -// if (Boolean.parseBoolean(request.getParameter(RestConstants.SHARE_LIB_ALLSERVER_REQUEST))) { -// JSONArray jsonArray = new JSONArray(); -// JSONObject json = new JSONObject(); -// try { -// JobsConcurrencyService jc = Services.get().get(JobsConcurrencyService.class); -// Map<String, String> serverList = jc.getServerUrls(); -// for (String server : serverList.values()) { -// String serverUrl = server + "/v2/admin/" + RestConstants.ADMIN_UPDATE_SHARELIB + "?" -// + RestConstants.SHARE_LIB_ALLSERVER_REQUEST + "=false"; -// try { -// jsonArray.add(callServer(serverUrl, request)); -// } -// catch (Exception e) { -// JSONObject errorJson = new JSONObject(); -// errorJson.put(JsonTags.SHARELIB_UPDATE_HOST, server); -// errorJson.put(JsonTags.SHARELIB_UPDATE_STATUS, e.getMessage()); -// jsonArray.add(errorJson); -// -// } -// } -// json.put(JsonTags.SHARELIB_LIB_UPDATE, jsonArray); -// sendJsonResponse(response, HttpServletResponse.SC_OK, json); -// } -// -// catch (Exception e) { -// sendErrorResponse(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "internal Error", -// e.getMessage()); -// } -// -// } -// else { + @SuppressWarnings("unchecked") + private JSONObject updateLocalShareLib(HttpServletRequest request) { ShareLibService shareLibService = Services.get().get(ShareLibService.class); - JSONObject status = new JSONObject(); - JSONObject json = new JSONObject(); json.put(JsonTags.SHARELIB_UPDATE_HOST, request.getServerName() + ":" + request.getServerPort()); try { @@ -259,31 +266,9 @@ public abstract class BaseAdminServlet extends JsonRestServlet { catch (Exception e) { json.put(JsonTags.SHARELIB_UPDATE_STATUS, e.getClass().getName() + ": " + e.getMessage()); } - status.put(JsonTags.SHARELIB_LIB_UPDATE, json); - sendJsonResponse(response, HttpServletResponse.SC_OK, status); - - // } - + return json; } -// private JSONObject callServer(String url, HttpServletRequest request) throws MalformedURLException, IOException { -// HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); -// conn.setRequestMethod("GET"); -// -// Enumeration headerNames = request.getHeaderNames(); -// while (headerNames.hasMoreElements()) { -// String headerName = (String) headerNames.nextElement(); -// -// conn.setRequestProperty(headerName, request.getHeader(headerName)); -// } -// conn.connect(); -// -// Reader reader = new InputStreamReader(conn.getInputStream()); -// JSONObject json = (JSONObject) JSONValue.parse(reader); -// return (JSONObject) json.get(JsonTags.SHARELIB_LIB_UPDATE); -// -// } - /** * Authorize request. * http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java index 4a0067f..0ba4332 100644 --- a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java +++ b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java @@ -18,8 +18,11 @@ package org.apache.oozie.service; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.ConfigUtils; @@ -87,4 +90,24 @@ public class TestJobsConcurrencyService extends XTestCase { jcs.destroy(); } } + + public void testsAllServerRequest() throws Exception { + JobsConcurrencyService jcs = new JobsConcurrencyService(); + try { + jcs.init(null); + assertFalse(jcs.isAllServerRequest(null)); + Map<String, String[]> param = new HashMap<String, String[]>(); + assertFalse(jcs.isAllServerRequest(param)); + param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "test" }); + assertFalse(jcs.isAllServerRequest(param)); + param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "true" }); + assertFalse(jcs.isAllServerRequest(param)); + param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "false" }); + assertFalse(jcs.isAllServerRequest(param)); + } + finally { + jcs.destroy(); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java index 330b620..644a76e 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java @@ -18,8 +18,11 @@ package org.apache.oozie.service; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.test.ZKXTestCase; import org.apache.oozie.util.ConfigUtils; import org.apache.oozie.util.ZKUtils; @@ -244,4 +247,23 @@ public class TestZKJobsConcurrencyService extends ZKXTestCase { } } } + + public void testisAllServerRequest() throws Exception { + ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService(); + try { + zkjcs.init(Services.get()); + assertTrue(zkjcs.isAllServerRequest(null)); + Map<String, String[]> param = new HashMap<String, String[]>(); + assertTrue(zkjcs.isAllServerRequest(param)); + param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "test" }); + assertTrue(zkjcs.isAllServerRequest(param)); + param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "true" }); + assertTrue(zkjcs.isAllServerRequest(param)); + param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "false" }); + assertFalse(zkjcs.isAllServerRequest(param)); + } + finally { + zkjcs.destroy(); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java index c869b12..29bca41 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java @@ -195,7 +195,9 @@ public class TestZKXLogStreamingService extends ZKXTestCase { StringWriter w = new StringWriter(); ZKXLogStreamingService zkxlss = new ZKXLogStreamingService(); try { - zkxlss.init(Services.get()); + Services services=Services.get(); + services.setService(ZKJobsConcurrencyService.class); + zkxlss.init(services); sleep(1000); // Sleep to allow ZKUtils ServiceCache to update zkxlss.streamLog(xf, null, null, w, new HashMap<String, String[]>()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/docs/src/site/twiki/DG_CommandLineTool.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki index af472d3..0748ff8 100644 --- a/docs/src/site/twiki/DG_CommandLineTool.twiki +++ b/docs/src/site/twiki/DG_CommandLineTool.twiki @@ -847,7 +847,7 @@ $ oozie admin -oozie http://localhost:11000/oozie -sharelib pig* </verbatim> ---+++ Update system sharelib -This command makes the oozie server to pick up the latest version of sharelib present +This command makes the oozie server(s) to pick up the latest version of sharelib present under oozie.service.WorkflowAppService.system.libpath directory based on the sharelib directory timestamp or reloads the sharelib metafile if one is configured. The main purpose is to update the sharelib on the oozie server without restarting. @@ -855,21 +855,35 @@ the sharelib metafile if one is configured. The main purpose is to update the sh $ oozie admin -oozie http://localhost:11000/oozie -sharelibupdate [ShareLib update status] ShareLib update status] - host = localhost:11000 - status = Successful - sharelibDirOld = hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729 - sharelibDirNew = hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343 + host = host1:8080 + status = Successful + sharelibDirOld = hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729 + sharelibDirNew = hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343 + + host = host2:8080 + status = Successful + sharelibDirOld = hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729 + sharelibDirNew = hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343 + + host = host3:8080 + status = Server not found </verbatim> Sharelib update for metafile configuration. <verbatim> $ oozie admin -oozie http://localhost:11000/oozie -sharelibupdate [ShareLib update status] - host = localhost:11000 - status = Successful - sharelibMetaFile = hdfs://localhost:9000/user/purushah/sharelib_metafile.property - sharelibMetaFileOldTimeStamp = Thu, 21 Nov 2013 00:40:04 GMT - sharelibMetaFileNewTimeStamp = Thu, 21 Nov 2013 01:01:25 GMT + host = host1 + status = Successful + sharelibMetaFile = hdfs://localhost:9000/user/purushah/sharelib_metafile.property + sharelibMetaFileOldTimeStamp = Thu, 21 Nov 2013 00:40:04 GMT + sharelibMetaFileNewTimeStamp = Thu, 21 Nov 2013 01:01:25 GMT + + host = host2 + status = Successful + sharelibMetaFile = hdfs://localhost:9000/user/purushah/sharelib_metafile.property + sharelibMetaFileOldTimeStamp = Thu, 21 Nov 2013 00:40:04 GMT + sharelibMetaFileNewTimeStamp = Thu, 21 Nov 2013 01:01:25 GMT </verbatim> #SLAOperations http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/docs/src/site/twiki/WebServicesAPI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WebServicesAPI.twiki b/docs/src/site/twiki/WebServicesAPI.twiki index 50795b4..4b20075 100644 --- a/docs/src/site/twiki/WebServicesAPI.twiki +++ b/docs/src/site/twiki/WebServicesAPI.twiki @@ -441,7 +441,7 @@ Content-Type: application/json;charset=UTF-8 ---++++ Update system sharelib -This webservice call makes the oozie server to pick up the latest version of sharelib present +This webservice call makes the oozie server(s) to pick up the latest version of sharelib present under oozie.service.WorkflowAppService.system.libpath directory based on the sharelib directory timestamp or reloads the sharelib metafile if one is configured. The main purpose is to update the sharelib on the oozie server without restarting. @@ -457,18 +457,30 @@ GET /oozie/v2/admin/update_sharelib <verbatim> HTTP/1.1 200 OK Content-Type: application/json;charset=UTF-8 - -{ - { - "sharelibUpdate": - { - "sharelibDirOld":"hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729", - "sharelibDirNew":"hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343", - "host":"localhost:11000", - "status":"Successful" - } +[ + { + "sharelibUpdate":{ + "host":"server1", + "status":"Server not found" } -} + }, + { + "sharelibUpdate":{ + "host":"server2", + "status":"Successful", + "sharelibDirOld":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218", + "sharelibDirNew":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218" + } + }, + { + "sharelibUpdate":{ + "host":"server3", + "status":"Successful", + "sharelibDirOld":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218", + "sharelibDirNew":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218" + } + } +] </verbatim> ---+++ Job and Jobs End-Points http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 35726b9..7bf006e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1609 HA support for share lib. (puru via rkanter) OOZIE-1622 Multiple CoordSubmit for same bundle (shwethags via virag) OOZIE-1644 Default config from config-default.xml is not propagated to actions (mona) OOZIE-1645 Oozie upgrade DB command fails due to missing dependencies for mssql (omaliuvanchuk via rkanter)
