Repository: oozie
Updated Branches:
  refs/heads/master 844b067b3 -> 65e2109df


OOZIE-1689 HA support for OOZIE-7(Ability to view the log information 
corresponding to particular coordinator action) (puru via mona)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/65e2109d
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/65e2109d
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/65e2109d

Branch: refs/heads/master
Commit: 65e2109dfa8a19ece782babcea8e110d8bdb6814
Parents: 844b067
Author: mona <[email protected]>
Authored: Tue May 20 16:29:25 2014 -0700
Committer: mona <[email protected]>
Committed: Tue May 20 16:29:25 2014 -0700

----------------------------------------------------------------------
 .../oozie/service/ZKXLogStreamingService.java   |   7 +-
 .../org/apache/oozie/util/AuthUrlClient.java    |  19 +++
 .../service/TestZKXLogStreamingService.java     | 128 ++++++++++++++++++-
 release-log.txt                                 |   1 +
 4 files changed, 149 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java 
b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index 7d53378..6258d40 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -113,7 +113,7 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
             }
             // Otherwise, we have to go collate relevant logs from the other 
Oozie servers
             else {
-                collateLogs(filter, startTime, endTime, writer);
+                collateLogs(filter, startTime, endTime, writer, params);
             }
         }
         else {
@@ -133,7 +133,8 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
      * @param writer
      * @throws IOException
      */
-    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, 
Writer writer) throws IOException {
+    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, 
Writer writer,
+            Map<String, String[]> params) throws IOException {
         XLogService xLogService = Services.get().get(XLogService.class);
         List<String> badOozies = new ArrayList<String>();
         List<ServiceInstance<Map>> oozies = null;
@@ -164,7 +165,7 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
                      // Server from trying aggregate logs from the other Oozie 
servers (and creating an infinite recursion)
                         final String url = otherUrl + "/v" + 
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
                                 + "/" + jobId + "?" + 
RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG
-                                + "&" + RestConstants.ALL_SERVER_REQUEST + 
"=false";
+                                + "&" + RestConstants.ALL_SERVER_REQUEST + 
"=false" + AuthUrlClient.getQueryParamString(params);
 
                         BufferedReader reader = AuthUrlClient.callServer(url);
                         parsers.add(new SimpleTimestampedMessageParser(reader, 
filter));

http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java 
b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
index 79f8883..f70eb2e 100644
--- a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
+++ b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@@ -131,4 +132,22 @@ public class AuthUrlClient {
         }
         return reader;
     }
+
+    public static String getQueryParamString(Map<String, String[]> params) {
+        StringBuilder stringBuilder = new StringBuilder();
+        if (params == null || params.isEmpty()) {
+            return "";
+        }
+        for (String key : params.keySet()) {
+            if (!key.isEmpty() && params.get(key).length > 0) {
+                stringBuilder.append("&");
+                String value = params.get(key)[0]; // We don't support multi 
value.
+                stringBuilder.append(key);
+                stringBuilder.append("=");
+                stringBuilder.append(value);
+            }
+        }
+        return stringBuilder.toString();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index fd48951..5d8d69f 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -22,12 +22,16 @@ import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.logging.LogFactory;
+import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.test.EmbeddedServletContainer;
 import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.ZKUtils;
@@ -192,14 +196,18 @@ public class TestZKXLogStreamingService extends 
ZKXTestCase {
     }
 
     protected String doStreamLog(XLogFilter xf) throws Exception {
+        return doStreamLog(xf, new HashMap<String, String[]>());
+    }
+
+    protected String doStreamLog(XLogFilter xf, Map<String, String[]> param) 
throws Exception {
         StringWriter w = new StringWriter();
         ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
         try {
-            Services services=Services.get();
+            Services services = Services.get();
             services.setService(ZKJobsConcurrencyService.class);
             zkxlss.init(services);
-            sleep(1000);    // Sleep to allow ZKUtils ServiceCache to update
-            zkxlss.streamLog(xf, null, null, w, new HashMap<String, 
String[]>());
+            sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
+            zkxlss.streamLog(xf, null, null, w, param);
         }
         finally {
             zkxlss.destroy();
@@ -340,4 +348,118 @@ public class TestZKXLogStreamingService extends 
ZKXTestCase {
             container.stop();
         }
     }
+    public void testStreamingWithMultipleOozieServers_coordActionList() throws 
Exception {
+        XLogFilter.reset();
+
+        File log4jFile = new File(getTestCaseConfDir(), 
"test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = 
cl.getResourceAsStream("test-no-dash-log4j.properties");
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + 
"/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+        Services.get().get(XLogService.class).init(Services.get());
+
+        File logFile = new 
File(Services.get().get(XLogService.class).getOozieLogPath(), Services.get()
+                .get(XLogService.class).getOozieLogName());
+        logFile.getParentFile().mkdirs();
+        FileWriter logWriter = new FileWriter(logFile);
+        // local logs
+        StringBuffer bf = new StringBuffer();
+        bf.append(
+                "2014-02-06 00:26:56,126 DEBUG 
CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+                     + "TOKEN[-] APP[-] 
JOB[0000003-140205233038063-oozie-oozi-C] 
ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+                     + "checking for the file 
~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
+                .append("2014-02-06 00:26:56,150  INFO 
CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+                     + "TOKEN[-] APP[-] 
JOB[0000003-140205233038063-oozie-oozi-C] 
ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+                     + 
"[0000003-140205233038063-oozie-oozi-C@1]::ActionInputCheck:: 
File::8020/user/purushah/examples/input-data/"
+                     + "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + 
"Action updated in DB! _L1_")
+                .append("\n")
+                .append("2014-02-06 00:27:56,126 DEBUG 
CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+                     + "TOKEN[-] APP[-] 
JOB[0000003-140205233038063-oozie-oozi-C] 
ACTION[0000003-140205233038063-oozie-oozi-C@2] "
+                     + "checking for the file 
~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
+                .append("2014-02-06 00:27:56,150  INFO 
CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+                     + "TOKEN[-] APP[-] 
JOB[0000003-140205233038063-oozie-oozi-C] 
ACTION[0000003-140205233038063-oozie-oozi-C@2] "
+                     + 
"[0000003-140205233038063-oozie-oozi-C@2]::ActionInputCheck:: 
File::8020/user/purushah/examples/input-data/"
+                     + "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + 
"Action updated in DB! _L2_")
+                .append("\n");
+        logWriter.append(bf);
+
+        logWriter.close();
+
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+
+        XLogFilter xf = new XLogFilter();
+
+        xf.setLogLevel("DEBUG|INFO");
+        xf.setParameter("USER", ".*");
+        xf.setParameter("GROUP", ".*");
+        xf.setParameter("TOKEN", ".*");
+        xf.setParameter("APP", ".*");
+        xf.setParameter("JOB", "0000003-140205233038063-oozie-oozi-C");
+        xf.setParameter(DagXLogInfoService.ACTION, 
"0000003-140205233038063-oozie-oozi-C@1");
+
+        String out = doStreamLog(xf);
+        String[] outArr = out.split("\n");
+        assertEquals(2, outArr.length);
+        assertTrue(out.contains("_L1_"));
+        assertFalse(out.contains("_L2_"));
+
+        // We'll use a DummyZKOozie to create an entry in ZK and then set its
+        // url to an (unrelated) servlet that will simply return
+        // some log messages
+        DummyZKOozie dummyOozie = null;
+        EmbeddedServletContainer container = new 
EmbeddedServletContainer("oozie");
+        container.addServletEndpoint("/other-oozie-server/*", 
DummyLogStreamingServlet.class);
+        try {
+            container.start();
+            dummyOozie = new DummyZKOozie("9876", 
container.getServletURL("/other-oozie-server/*"));
+            DummyLogStreamingServlet.logs = "";
+
+            DummyLogStreamingServlet.lastQueryString = null;
+            Map<String, String[]> param = new HashMap<String, String[]>();
+            param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { 
RestConstants.JOB_LOG_ACTION });
+            param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { "1" 
});
+            out = doStreamLog(xf, param);
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false"
 ));
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + 
RestConstants.JOB_LOG_ACTION ));
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM
 + "=1" ));
+
+            param.clear();
+            param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { 
RestConstants.JOB_LOG_ACTION });
+            param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { 
"1-4,5" });
+            out = doStreamLog(xf, param);
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false"
 ));
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + 
RestConstants.JOB_LOG_ACTION ));
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM
 + "=1-4,5" ));
+
+            param.clear();
+            Date endDate = new Date();
+            Date createdDate = new Date(endDate.getTime() / 2);
+            String date = DateUtils.formatDateOozieTZ(createdDate) + "::" + 
DateUtils.formatDateOozieTZ(endDate);
+            param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { 
RestConstants.JOB_LOG_DATE });
+            param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { date 
});
+            out = doStreamLog(xf, param);
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false"
 ));
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + 
RestConstants.JOB_LOG_DATE ));
+            
assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM
 + "=" + date ));
+
+            container.stop();
+        }
+        finally {
+            if (dummyOozie != null) {
+                dummyOozie.teardown();
+            }
+            container.stop();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8759dad..8940746 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1689 HA support for OOZIE-7(Ability to view the log information 
corresponding to particular coordinator action) (puru via mona)
 OOZIE-1849 If the underlying job finishes while a Workflow is suspended, Oozie 
can take a while to realize it (rkanter)
 OOZIE-1835 NullPointerException from SLAEmailEventListener (rkanter)
 OOZIE-1809 offset and len options are ignored in oozie job -info for workflow 
(ryota)

Reply via email to