Updated Branches:
  refs/heads/master 67e54f610 -> 2a2e2139b

OOZIE-1609 HA support for share lib. (puru via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2a2e2139
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2a2e2139
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2a2e2139

Branch: refs/heads/master
Commit: 2a2e2139b58ad1bec882870965b125fbdbf9123f
Parents: 67e54f6
Author: Robert Kanter <[email protected]>
Authored: Wed Jan 29 14:14:33 2014 -0800
Committer: Robert Kanter <[email protected]>
Committed: Wed Jan 29 14:14:33 2014 -0800

----------------------------------------------------------------------
 .../org/apache/oozie/client/OozieClient.java    |  27 +++--
 .../apache/oozie/client/rest/RestConstants.java |   2 +-
 .../oozie/service/JobsConcurrencyService.java   |  21 ++++
 .../apache/oozie/service/ShareLibService.java   |   8 +-
 .../oozie/service/ZKJobsConcurrencyService.java |  37 ++++++
 .../oozie/service/ZKXLogStreamingService.java   | 112 ++-----------------
 .../apache/oozie/servlet/BaseAdminServlet.java  | 101 +++++++----------
 .../service/TestJobsConcurrencyService.java     |  23 ++++
 .../service/TestZKJobsConcurrencyService.java   |  22 ++++
 .../service/TestZKXLogStreamingService.java     |   4 +-
 docs/src/site/twiki/DG_CommandLineTool.twiki    |  34 ++++--
 docs/src/site/twiki/WebServicesAPI.twiki        |  36 ++++--
 release-log.txt                                 |   1 +
 13 files changed, 237 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java 
b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index 9d6c9e0..b0a85fd 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -1521,7 +1521,8 @@ public class OozieClient {
     private  class UpdateSharelib extends ClientCallable<String> {
 
         UpdateSharelib() {
-            super("GET", RestConstants.ADMIN, 
RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams());
+            super("GET", RestConstants.ADMIN, 
RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams(
+                    RestConstants.ALL_SERVER_REQUEST, "true"));
         }
 
         @Override
@@ -1529,15 +1530,27 @@ public class OozieClient {
             StringBuffer bf = new StringBuffer();
             if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
                 Reader reader = new InputStreamReader(conn.getInputStream());
-                JSONObject json = (JSONObject) ((JSONObject) 
JSONValue.parse(reader))
-                        .get(JsonTags.SHARELIB_LIB_UPDATE);
+                Object sharelib = (Object) JSONValue.parse(reader);
                 bf.append("[ShareLib update 
status]").append(System.getProperty("line.separator"));
-                    for (Object key : json.keySet()) {
-                        bf.append(" ").append(key).append(" = 
").append(json.get(key))
+                if (sharelib instanceof JSONArray) {
+                    for (Object o : ((JSONArray) sharelib)) {
+                        JSONObject obj = (JSONObject) ((JSONObject) 
o).get(JsonTags.SHARELIB_LIB_UPDATE);
+                        for (Object key : obj.keySet()) {
+                            bf.append("\t").append(key).append(" = 
").append(obj.get(key))
+                                    
.append(System.getProperty("line.separator"));
+                        }
+                        bf.append(System.getProperty("line.separator"));
+                    }
+                    return bf.toString();
+                }
+                else{
+                    JSONObject obj = (JSONObject) ((JSONObject) 
sharelib).get(JsonTags.SHARELIB_LIB_UPDATE);
+                    for (Object key : obj.keySet()) {
+                        bf.append("\t").append(key).append(" = 
").append(obj.get(key))
                                 .append(System.getProperty("line.separator"));
+                    }
+                    bf.append(System.getProperty("line.separator"));
                 }
-
-                return bf.toString();
             }
             else {
                 handleError(conn);

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java 
b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
index f65239d..0466ffe 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
@@ -166,6 +166,6 @@ public interface RestConstants {
 
     public static final String SHARE_LIB_REQUEST_KEY = "lib";
 
-    public static final String SHARE_LIB_ALLSERVER_REQUEST = "allservers";
+    public static final String ALL_SERVER_REQUEST = "allservers";
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java 
b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 99c16e0..27c97e6 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -113,4 +113,25 @@ public class JobsConcurrencyService implements Service, 
Instrumentable {
     public Map<String, String> getServerUrls() {
         return urls;
     }
+
+    /**
+     * Return a map of instance id to other Oozie servers URL in HA.  This 
implementation always returns a empty map.
+     *
+     * @return A map of Oozie instance ids and URLs
+     * @throws Exception
+     */
+    public Map<String, String> getOtherServerUrls() {
+        return new HashMap<String, String>();
+    }
+
+    /**
+     * Checks if rest request is for all server. This function always return
+     * false;
+     *
+     * @param params the HttpRequest param
+     * @return false.
+     */
+    public boolean isAllServerRequest(Map<String, String[]> params) {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/ShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java 
b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
index 9556620..791d1a9 100644
--- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java
+++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -103,8 +104,11 @@ public class ShareLibService implements Service, 
Instrumentable {
             fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
             updateLauncherLib();
             updateShareLib();
-            purgeLibs(fs, LAUNCHER_PREFIX);
-            purgeLibs(fs, SHARED_LIB_PREFIX);
+            //Only one server should purge sharelib
+            if 
(Services.get().get(JobsConcurrencyService.class).isFirstServer()) {
+                purgeLibs(fs, LAUNCHER_PREFIX);
+                purgeLibs(fs, SHARED_LIB_PREFIX);
+            }
         }
         catch (Exception e) {
             LOG.error("Not able to cache shareLib. Admin need to issue oozlie 
cli command to update sharelib.", e);

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java 
b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 42fce05..611b74c 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -25,6 +25,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.ZKUtils;
@@ -170,4 +171,40 @@ public class ZKJobsConcurrencyService extends 
JobsConcurrencyService implements
         }
         return urls;
     }
+
+    /**
+     * Return a map of instance id to Oozie server URL of other servers.  This 
implementation always returns a map with
+     * where the key is the instance id and the value is the URL of each Oozie 
server that we can see in the service
+     * discovery in ZooKeeper.
+     *
+     * @return A map of Oozie instance ids and URLs
+     */
+    @Override
+    public Map<String, String> getOtherServerUrls() {
+        Map<String, String> urls = new HashMap<String, String>();
+        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+        for (ServiceInstance<Map> oozie : oozies) {
+            Map<String, String> metadata = oozie.getPayload();
+            String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
+
+            if (id.equals(zk.getZKId())) {
+                continue;
+            }
+            String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
+            urls.put(id, url);
+        }
+        return urls;
+    }
+
+    /**
+     * Checks if rest request is for all server. By default it's true.
+     *
+     * @param params the HttpRequest param
+     * @return false if allservers=false, else true;
+     */
+    @Override
+    public boolean isAllServerRequest(Map<String, String[]> params) {
+        return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) 
== null || params.isEmpty()
+                || 
!params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false");
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java 
b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index c17a8aa..8dc8b4b 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -18,31 +18,24 @@
 package org.apache.oozie.service;
 
 import java.io.BufferedReader;
+
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLogStreamer;
+
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Writer;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+
 import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.Authenticator;
-import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
-import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.util.AuthUrlClient;
 import org.apache.oozie.util.SimpleTimestampedMessageParser;
 import org.apache.oozie.util.TimestampedMessageParser;
 import org.apache.oozie.util.XLog;
@@ -54,11 +47,8 @@ import org.apache.oozie.util.ZKUtils;
  */
 public class ZKXLogStreamingService extends XLogStreamingService implements 
Service, Instrumentable {
 
-    private static final String ALL_SERVERS_PARAM = "allservers";
-
     private ZKUtils zk;
     private XLog log;
-    private Class<? extends Authenticator> AuthenticatorClass;
 
     /**
      * Initialize the log streaming service.
@@ -76,11 +66,6 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
             throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
         }
         log = XLog.getLog(this.getClass());
-        try {
-            AuthenticatorClass = determineAuthenticatorClassType();
-        } catch (Exception ex) {
-            throw new ServiceException(ErrorCode.E0100, ex);
-        }
     }
 
     /**
@@ -121,8 +106,7 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
         XLogService xLogService = Services.get().get(XLogService.class);
         if (xLogService.getLogOverWS()) {
             // If ALL_SERVERS_PARAM is set to false, then only stream our log
-            if (params.get(ALL_SERVERS_PARAM) != null && 
params.get(ALL_SERVERS_PARAM).length > 0
-                    && params.get(ALL_SERVERS_PARAM)[0].equals("false")) {
+            if 
(!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
                 new XLogStreamer(filter, xLogService.getOozieLogPath(), 
xLogService.getOozieLogName(),
                         xLogService.getOozieLogRotation()).streamLog(writer, 
startTime, endTime, bufferLen);
             }
@@ -175,7 +159,13 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
                     String otherUrl = 
oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
                     String jobId = 
filter.getFilterParams().get(DagXLogInfoService.JOB);
                     try {
-                        BufferedReader reader = fetchOtherLog(otherUrl, jobId);
+                     // It's important that we specify ALL_SERVERS_PARAM=false 
in the GET request to prevent the other Oozie
+                     // Server from trying aggregate logs from the other Oozie 
servers (and creating an infinite recursion)
+                        final String url = otherUrl + "/v" + 
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
+                                + "/" + jobId + "?" + 
RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG
+                                + "&" + RestConstants.ALL_SERVER_REQUEST + 
"=false";
+
+                        BufferedReader reader = AuthUrlClient.callServer(url);
                         parsers.add(new SimpleTimestampedMessageParser(reader, 
filter));
                     }
                     catch(IOException ioe) {
@@ -247,82 +237,4 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
             writer.flush();
         }
     }
-
-    /**
-     * Creates a connection over HTTP to another Oozie server and asks it for 
the logs related to the jobId.
-     *
-     * @param otherUrl The URL of the other Oozie server
-     * @param jobId The job id of the job we want logs for
-     * @return a BufferedReader of the logs from the other server for the jobId
-     * @throws IOException If there was a problem connecting to the other 
Oozie server
-     */
-    private BufferedReader fetchOtherLog(String otherUrl, String jobId) throws 
IOException {
-        // It's important that we specify ALL_SERVERS_PARAM=false in the GET 
request to prevent the other Oozie Server from trying
-        // aggregate logs from the other Oozie servers (and creating an 
infinite recursion)
-        final URL url = new URL(otherUrl + "/v" + 
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB + "/" + jobId
-                + "?" + RestConstants.JOB_SHOW_PARAM + "=" + 
RestConstants.JOB_SHOW_LOG + "&" + ALL_SERVERS_PARAM + "=false");
-
-        log.debug("Fetching logs from [{0}]", url);
-        BufferedReader reader = null;
-        try {
-            reader = UserGroupInformation.getLoginUser().doAs(new 
PrivilegedExceptionAction<BufferedReader>() {
-                @Override
-                public BufferedReader run() throws IOException {
-                    HttpURLConnection conn = getConnection(url);
-                    BufferedReader reader = null;
-                    if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) 
{
-                        InputStream is = conn.getInputStream();
-                        reader = new BufferedReader(new InputStreamReader(is));
-                    }
-                    return reader;
-                }
-            });
-        }
-        catch (InterruptedException ie) {
-            throw new IOException(ie);
-        }
-        return reader;
-    }
-
-    private HttpURLConnection getConnection(URL url) throws IOException {
-        AuthenticatedURL.Token token = new AuthenticatedURL.Token();
-        HttpURLConnection conn;
-        try {
-            conn = new 
AuthenticatedURL(AuthenticatorClass.newInstance()).openConnection(url, token);
-        }
-        catch (AuthenticationException ex) {
-            throw new IOException("Could not authenticate, " + 
ex.getMessage(), ex);
-        } catch (InstantiationException ex) {
-            throw new IOException("Could not authenticate, " + 
ex.getMessage(), ex);
-        } catch (IllegalAccessException ex) {
-            throw new IOException("Could not authenticate, " + 
ex.getMessage(), ex);
-        }
-        if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
-            throw new IOException("Unexpected response code [" + 
conn.getResponseCode() + "], message [" + conn.getResponseMessage()
-                    + "]");
-        }
-        return conn;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Class<? extends Authenticator> determineAuthenticatorClassType() 
throws Exception {
-        // Adapted from 
org.apache.hadoop.security.authentication.server.AuthenticationFilter#init
-        Class<? extends Authenticator> authClass;
-        String authName = 
Services.get().getConf().get("oozie.authentication.type");
-        String authClassName;
-        if (authName == null) {
-            throw new IOException("Authentication type must be specified: 
simple|kerberos|<class>");
-        }
-        authName = authName.trim();
-        if (authName.equals("simple")) {
-            authClassName = PseudoAuthenticator.class.getName();
-        } else if (authName.equals("kerberos")) {
-            authClassName = KerberosAuthenticator.class.getName();
-        } else {
-            authClassName = authName;
-        }
-
-        authClass = (Class<? extends Authenticator>) 
Thread.currentThread().getContextClassLoader().loadClass(authClassName);
-        return authClass;
-    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java 
b/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
index 091070f..29d7bd6 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
@@ -18,6 +18,7 @@
 package org.apache.oozie.servlet;
 
 import java.io.IOException;
+import java.io.Reader;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -33,11 +34,14 @@ import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.AuthorizationException;
 import org.apache.oozie.service.AuthorizationService;
 import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.JobsConcurrencyService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
+import org.apache.oozie.util.AuthUrlClient;
 import org.apache.oozie.util.Instrumentation;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 
 public abstract class BaseAdminServlet extends JsonRestServlet {
 
@@ -206,7 +210,7 @@ public abstract class BaseAdminServlet extends 
JsonRestServlet {
     }
 
     /**
-     * Update share lib.
+     * Update share lib. support HA
      *
      * @param request the request
      * @param response the response
@@ -214,42 +218,45 @@ public abstract class BaseAdminServlet extends 
JsonRestServlet {
      */
     @SuppressWarnings("unchecked")
     public void updateShareLib(HttpServletRequest request, HttpServletResponse 
response) throws IOException {
+        JSONArray jsonArray = new JSONArray();
+        JobsConcurrencyService jc = 
Services.get().get(JobsConcurrencyService.class);
+        if (jc.isAllServerRequest(request.getParameterMap())) {
+            Map<String, String> servers = jc.getOtherServerUrls();
+            for (String otherUrl : servers.values()) {
+                // It's important that we specify ALL_SERVERS_PARAM=false, so 
that other oozie server should not call other oozie
+                //servers to update sharelib (and creating an infinite 
recursion)
+                String serverUrl = otherUrl + "/v2/admin/" + 
RestConstants.ADMIN_UPDATE_SHARELIB + "?"
+                        + RestConstants.ALL_SERVER_REQUEST + "=false";
+                try {
+                    Reader reader = AuthUrlClient.callServer(serverUrl);
+                    JSONObject json = (JSONObject) JSONValue.parse(reader);
+                    jsonArray.add(json);
+                }
+                catch (Exception e) {
+                    JSONObject errorJson = new JSONObject();
+                    errorJson.put(JsonTags.SHARELIB_UPDATE_HOST, otherUrl);
+                    errorJson.put(JsonTags.SHARELIB_UPDATE_STATUS, 
e.getMessage());
+                    JSONObject newJson = new JSONObject();
+                    newJson.put(JsonTags.SHARELIB_LIB_UPDATE, errorJson);
+                    jsonArray.add(newJson);
+                }
+            }
+            //For current server
+            JSONObject newJson = new JSONObject();
+            newJson.put(JsonTags.SHARELIB_LIB_UPDATE, 
updateLocalShareLib(request));
+            jsonArray.add(newJson);
+            sendJsonResponse(response, HttpServletResponse.SC_OK, jsonArray);
+        }
+        else {
+            JSONObject newJson = new JSONObject();
+            newJson.put(JsonTags.SHARELIB_LIB_UPDATE, 
updateLocalShareLib(request));
+            sendJsonResponse(response, HttpServletResponse.SC_OK, newJson);
+        }
+    }
 
-        //TODO sharelib HA.
-//        if 
(Boolean.parseBoolean(request.getParameter(RestConstants.SHARE_LIB_ALLSERVER_REQUEST)))
 {
-//            JSONArray jsonArray = new JSONArray();
-//            JSONObject json = new JSONObject();
-//            try {
-//                JobsConcurrencyService jc = 
Services.get().get(JobsConcurrencyService.class);
-//                Map<String, String> serverList = jc.getServerUrls();
-//                for (String server : serverList.values()) {
-//                    String serverUrl = server + "/v2/admin/" + 
RestConstants.ADMIN_UPDATE_SHARELIB + "?"
-//                            + RestConstants.SHARE_LIB_ALLSERVER_REQUEST + 
"=false";
-//                    try {
-//                        jsonArray.add(callServer(serverUrl, request));
-//                    }
-//                    catch (Exception e) {
-//                        JSONObject errorJson = new JSONObject();
-//                        errorJson.put(JsonTags.SHARELIB_UPDATE_HOST, server);
-//                        errorJson.put(JsonTags.SHARELIB_UPDATE_STATUS, 
e.getMessage());
-//                        jsonArray.add(errorJson);
-//
-//                    }
-//                }
-//                json.put(JsonTags.SHARELIB_LIB_UPDATE, jsonArray);
-//                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
-//            }
-//
-//            catch (Exception e) {
-//                sendErrorResponse(response, 
HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "internal Error",
-//                        e.getMessage());
-//            }
-//
-//        }
-//        else {
+    @SuppressWarnings("unchecked")
+    private JSONObject updateLocalShareLib(HttpServletRequest request) {
         ShareLibService shareLibService = 
Services.get().get(ShareLibService.class);
-        JSONObject status = new JSONObject();
-
         JSONObject json = new JSONObject();
         json.put(JsonTags.SHARELIB_UPDATE_HOST, request.getServerName() + ":" 
+ request.getServerPort());
         try {
@@ -259,31 +266,9 @@ public abstract class BaseAdminServlet extends 
JsonRestServlet {
         catch (Exception e) {
             json.put(JsonTags.SHARELIB_UPDATE_STATUS, e.getClass().getName() + 
": " + e.getMessage());
         }
-        status.put(JsonTags.SHARELIB_LIB_UPDATE, json);
-        sendJsonResponse(response, HttpServletResponse.SC_OK, status);
-
-        // }
-
+        return json;
     }
 
-//    private JSONObject callServer(String url, HttpServletRequest request) 
throws MalformedURLException, IOException {
-//        HttpURLConnection conn = (HttpURLConnection) new 
URL(url).openConnection();
-//        conn.setRequestMethod("GET");
-//
-//        Enumeration headerNames = request.getHeaderNames();
-//        while (headerNames.hasMoreElements()) {
-//            String headerName = (String) headerNames.nextElement();
-//
-//            conn.setRequestProperty(headerName, 
request.getHeader(headerName));
-//        }
-//        conn.connect();
-//
-//        Reader reader = new InputStreamReader(conn.getInputStream());
-//        JSONObject json = (JSONObject) JSONValue.parse(reader);
-//        return (JSONObject) json.get(JsonTags.SHARELIB_LIB_UPDATE);
-//
-//    }
-
     /**
      * Authorize request.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java 
b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
index 4a0067f..0ba4332 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
@@ -18,8 +18,11 @@
 package org.apache.oozie.service;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.ConfigUtils;
 
@@ -87,4 +90,24 @@ public class TestJobsConcurrencyService extends XTestCase {
             jcs.destroy();
         }
     }
+
+    public void testsAllServerRequest() throws Exception {
+        JobsConcurrencyService jcs = new JobsConcurrencyService();
+        try {
+            jcs.init(null);
+            assertFalse(jcs.isAllServerRequest(null));
+            Map<String, String[]> param = new HashMap<String, String[]>();
+            assertFalse(jcs.isAllServerRequest(param));
+            param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "test" 
});
+            assertFalse(jcs.isAllServerRequest(param));
+            param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "true" 
});
+            assertFalse(jcs.isAllServerRequest(param));
+            param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "false" 
});
+            assertFalse(jcs.isAllServerRequest(param));
+        }
+        finally {
+            jcs.destroy();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
index 330b620..644a76e 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
@@ -18,8 +18,11 @@
 package org.apache.oozie.service;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.ConfigUtils;
 import org.apache.oozie.util.ZKUtils;
@@ -244,4 +247,23 @@ public class TestZKJobsConcurrencyService extends 
ZKXTestCase {
             }
         }
     }
+
+    public void testisAllServerRequest() throws Exception {
+        ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
+        try {
+            zkjcs.init(Services.get());
+            assertTrue(zkjcs.isAllServerRequest(null));
+            Map<String, String[]> param = new HashMap<String, String[]>();
+            assertTrue(zkjcs.isAllServerRequest(param));
+            param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "test" 
});
+            assertTrue(zkjcs.isAllServerRequest(param));
+            param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "true" 
});
+            assertTrue(zkjcs.isAllServerRequest(param));
+            param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "false" 
});
+            assertFalse(zkjcs.isAllServerRequest(param));
+        }
+        finally {
+            zkjcs.destroy();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index c869b12..29bca41 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -195,7 +195,9 @@ public class TestZKXLogStreamingService extends ZKXTestCase 
{
         StringWriter w = new StringWriter();
         ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
         try {
-            zkxlss.init(Services.get());
+            Services services=Services.get();
+            services.setService(ZKJobsConcurrencyService.class);
+            zkxlss.init(services);
             sleep(1000);    // Sleep to allow ZKUtils ServiceCache to update
             zkxlss.streamLog(xf, null, null, w, new HashMap<String, 
String[]>());
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/docs/src/site/twiki/DG_CommandLineTool.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki 
b/docs/src/site/twiki/DG_CommandLineTool.twiki
index af472d3..0748ff8 100644
--- a/docs/src/site/twiki/DG_CommandLineTool.twiki
+++ b/docs/src/site/twiki/DG_CommandLineTool.twiki
@@ -847,7 +847,7 @@ $ oozie admin -oozie http://localhost:11000/oozie -sharelib 
pig*
 </verbatim>
 
 ---+++ Update system sharelib
-This command makes the oozie server to pick up the latest version of sharelib 
present
+This command makes the oozie server(s) to pick up the latest version of 
sharelib present
 under oozie.service.WorkflowAppService.system.libpath directory based on the 
sharelib directory timestamp or reloads
 the sharelib metafile if one is configured. The main purpose is to update the 
sharelib on the oozie server without restarting.
 
@@ -855,21 +855,35 @@ the sharelib metafile if one is configured. The main 
purpose is to update the sh
 $ oozie admin -oozie http://localhost:11000/oozie -sharelibupdate
 [ShareLib update status]
 ShareLib update status]
- host = localhost:11000
- status = Successful
- sharelibDirOld = 
hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729
- sharelibDirNew = 
hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343
+    host = host1:8080
+    status = Successful
+    sharelibDirOld = 
hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729
+    sharelibDirNew = 
hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343
+
+    host = host2:8080
+    status = Successful
+    sharelibDirOld = 
hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729
+    sharelibDirNew = 
hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343
+
+    host = host3:8080
+    status = Server not found
 </verbatim>
 
 Sharelib update for metafile configuration.
 <verbatim>
 $ oozie admin -oozie http://localhost:11000/oozie -sharelibupdate
 [ShareLib update status]
- host = localhost:11000
- status = Successful
- sharelibMetaFile = 
hdfs://localhost:9000/user/purushah/sharelib_metafile.property
- sharelibMetaFileOldTimeStamp = Thu, 21 Nov 2013 00:40:04 GMT
- sharelibMetaFileNewTimeStamp = Thu, 21 Nov 2013 01:01:25 GMT
+    host = host1
+    status = Successful
+    sharelibMetaFile = 
hdfs://localhost:9000/user/purushah/sharelib_metafile.property
+    sharelibMetaFileOldTimeStamp = Thu, 21 Nov 2013 00:40:04 GMT
+    sharelibMetaFileNewTimeStamp = Thu, 21 Nov 2013 01:01:25 GMT
+
+    host = host2
+    status = Successful
+    sharelibMetaFile = 
hdfs://localhost:9000/user/purushah/sharelib_metafile.property
+    sharelibMetaFileOldTimeStamp = Thu, 21 Nov 2013 00:40:04 GMT
+    sharelibMetaFileNewTimeStamp = Thu, 21 Nov 2013 01:01:25 GMT
 </verbatim>
 
 #SLAOperations

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/docs/src/site/twiki/WebServicesAPI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WebServicesAPI.twiki 
b/docs/src/site/twiki/WebServicesAPI.twiki
index 50795b4..4b20075 100644
--- a/docs/src/site/twiki/WebServicesAPI.twiki
+++ b/docs/src/site/twiki/WebServicesAPI.twiki
@@ -441,7 +441,7 @@ Content-Type: application/json;charset=UTF-8
 
 
 ---++++ Update system sharelib
-This webservice call makes the oozie server to pick up the latest version of 
sharelib present
+This webservice call makes the oozie server(s) to pick up the latest version 
of sharelib present
 under oozie.service.WorkflowAppService.system.libpath directory based on the 
sharelib directory timestamp or reloads
 the sharelib metafile if one is configured. The main purpose is to update the 
sharelib on the oozie server without restarting.
 
@@ -457,18 +457,30 @@ GET /oozie/v2/admin/update_sharelib
 <verbatim>
 HTTP/1.1 200 OK
 Content-Type: application/json;charset=UTF-8
-
-{
- {
-    "sharelibUpdate":
-        {
-            
"sharelibDirOld":"hdfs://localhost:9000/user/purushah/share/lib/lib_20131114095729",
-            
"sharelibDirNew":"hdfs://localhost:9000/user/purushah/share/lib/lib_20131120163343",
-            "host":"localhost:11000",
-            "status":"Successful"
-         }
+[
+    {
+       "sharelibUpdate":{
+          "host":"server1",
+          "status":"Server not found"
        }
-}
+    },
+    {
+       "sharelibUpdate":{
+          "host":"server2",
+          "status":"Successful",
+          
"sharelibDirOld":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218",
+          
"sharelibDirNew":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218"
+       }
+    },
+    {
+       "sharelibUpdate":{
+          "host":"server3",
+          "status":"Successful",
+          
"sharelibDirOld":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218",
+          
"sharelibDirNew":"hdfs://localhost:51951/user/purushah/share/lib/lib_20140107181218"
+       }
+    }
+]
 </verbatim>
 
 ---+++ Job and Jobs End-Points

http://git-wip-us.apache.org/repos/asf/oozie/blob/2a2e2139/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 35726b9..7bf006e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1609 HA support for share lib. (puru via rkanter)
 OOZIE-1622 Multiple CoordSubmit for same bundle (shwethags via virag)
 OOZIE-1644 Default config from config-default.xml is not propagated to actions 
(mona)
 OOZIE-1645 Oozie upgrade DB command fails due to missing dependencies for 
mssql (omaliuvanchuk via rkanter)

Reply via email to