Repository: oozie Updated Branches: refs/heads/master 844b067b3 -> 65e2109df
OOZIE-1689 HA support for OOZIE-7(Ability to view the log information corresponding to particular coordinator action) (puru via mona) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/65e2109d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/65e2109d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/65e2109d Branch: refs/heads/master Commit: 65e2109dfa8a19ece782babcea8e110d8bdb6814 Parents: 844b067 Author: mona <[email protected]> Authored: Tue May 20 16:29:25 2014 -0700 Committer: mona <[email protected]> Committed: Tue May 20 16:29:25 2014 -0700 ---------------------------------------------------------------------- .../oozie/service/ZKXLogStreamingService.java | 7 +- .../org/apache/oozie/util/AuthUrlClient.java | 19 +++ .../service/TestZKXLogStreamingService.java | 128 ++++++++++++++++++- release-log.txt | 1 + 4 files changed, 149 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java index 7d53378..6258d40 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java @@ -113,7 +113,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv } // Otherwise, we have to go collate relevant logs from the other Oozie servers else { - collateLogs(filter, startTime, endTime, writer); + collateLogs(filter, startTime, endTime, writer, params); } } else { @@ -133,7 +133,8 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv * @param writer * @throws IOException */ - private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer) throws IOException { + private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer, + Map<String, String[]> params) throws IOException { XLogService xLogService = Services.get().get(XLogService.class); List<String> badOozies = new ArrayList<String>(); List<ServiceInstance<Map>> oozies = null; @@ -164,7 +165,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG - + "&" + RestConstants.ALL_SERVER_REQUEST + "=false"; + + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params); BufferedReader reader = AuthUrlClient.callServer(url); parsers.add(new SimpleTimestampedMessageParser(reader, filter)); http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java index 79f8883..f70eb2e 100644 --- a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java +++ b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.security.PrivilegedExceptionAction; +import java.util.Map; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; @@ -131,4 +132,22 @@ public class AuthUrlClient { } return reader; } + + public static String getQueryParamString(Map<String, String[]> params) { + StringBuilder stringBuilder = new StringBuilder(); + if (params == null || params.isEmpty()) { + return ""; + } + for (String key : params.keySet()) { + if (!key.isEmpty() && params.get(key).length > 0) { + stringBuilder.append("&"); + String value = params.get(key)[0]; // We don't support multi value. + stringBuilder.append(key); + stringBuilder.append("="); + stringBuilder.append(value); + } + } + return stringBuilder.toString(); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java index fd48951..5d8d69f 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java @@ -22,12 +22,16 @@ import java.io.FileOutputStream; import java.io.FileWriter; import java.io.InputStream; import java.io.StringWriter; +import java.util.Date; import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.apache.commons.logging.LogFactory; +import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.test.EmbeddedServletContainer; import org.apache.oozie.test.ZKXTestCase; +import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.XLogFilter; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.ZKUtils; @@ -192,14 +196,18 @@ public class TestZKXLogStreamingService extends ZKXTestCase { } protected String doStreamLog(XLogFilter xf) throws Exception { + return doStreamLog(xf, new HashMap<String, String[]>()); + } + + protected String doStreamLog(XLogFilter xf, Map<String, String[]> param) throws Exception { StringWriter w = new StringWriter(); ZKXLogStreamingService zkxlss = new ZKXLogStreamingService(); try { - Services services=Services.get(); + Services services = Services.get(); services.setService(ZKJobsConcurrencyService.class); zkxlss.init(services); - sleep(1000); // Sleep to allow ZKUtils ServiceCache to update - zkxlss.streamLog(xf, null, null, w, new HashMap<String, String[]>()); + sleep(1000); // Sleep to allow ZKUtils ServiceCache to update + zkxlss.streamLog(xf, null, null, w, param); } finally { zkxlss.destroy(); @@ -340,4 +348,118 @@ public class TestZKXLogStreamingService extends ZKXTestCase { container.stop(); } } + public void testStreamingWithMultipleOozieServers_coordActionList() throws Exception { + XLogFilter.reset(); + + File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties"); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties"); + Properties log4jProps = new Properties(); + log4jProps.load(is); + // prevent conflicts with other tests by changing the log file location + log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log"); + log4jProps.store(new FileOutputStream(log4jFile), ""); + setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName()); + Services.get().get(XLogService.class).init(Services.get()); + + File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(), Services.get() + .get(XLogService.class).getOozieLogName()); + logFile.getParentFile().mkdirs(); + FileWriter logWriter = new FileWriter(logFile); + // local logs + StringBuffer bf = new StringBuffer(); + bf.append( + "2014-02-06 00:26:56,126 DEBUG CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] " + + "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] " + + "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n") + .append("2014-02-06 00:26:56,150 INFO CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] " + + "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] " + + "[0000003-140205233038063-oozie-oozi-C@1]::ActionInputCheck:: File::8020/user/purushah/examples/input-data/" + + "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L1_") + .append("\n") + .append("2014-02-06 00:27:56,126 DEBUG CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] " + + "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@2] " + + "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n") + .append("2014-02-06 00:27:56,150 INFO CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] " + + "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@2] " + + "[0000003-140205233038063-oozie-oozi-C@2]::ActionInputCheck:: File::8020/user/purushah/examples/input-data/" + + "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L2_") + .append("\n"); + logWriter.append(bf); + + logWriter.close(); + + XLogFilter.reset(); + XLogFilter.defineParameter("USER"); + XLogFilter.defineParameter("GROUP"); + XLogFilter.defineParameter("TOKEN"); + XLogFilter.defineParameter("APP"); + XLogFilter.defineParameter("JOB"); + XLogFilter.defineParameter("ACTION"); + + XLogFilter xf = new XLogFilter(); + + xf.setLogLevel("DEBUG|INFO"); + xf.setParameter("USER", ".*"); + xf.setParameter("GROUP", ".*"); + xf.setParameter("TOKEN", ".*"); + xf.setParameter("APP", ".*"); + xf.setParameter("JOB", "0000003-140205233038063-oozie-oozi-C"); + xf.setParameter(DagXLogInfoService.ACTION, "0000003-140205233038063-oozie-oozi-C@1"); + + String out = doStreamLog(xf); + String[] outArr = out.split("\n"); + assertEquals(2, outArr.length); + assertTrue(out.contains("_L1_")); + assertFalse(out.contains("_L2_")); + + // We'll use a DummyZKOozie to create an entry in ZK and then set its + // url to an (unrelated) servlet that will simply return + // some log messages + DummyZKOozie dummyOozie = null; + EmbeddedServletContainer container = new EmbeddedServletContainer("oozie"); + container.addServletEndpoint("/other-oozie-server/*", DummyLogStreamingServlet.class); + try { + container.start(); + dummyOozie = new DummyZKOozie("9876", container.getServletURL("/other-oozie-server/*")); + DummyLogStreamingServlet.logs = ""; + + DummyLogStreamingServlet.lastQueryString = null; + Map<String, String[]> param = new HashMap<String, String[]>(); + param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { RestConstants.JOB_LOG_ACTION }); + param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { "1" }); + out = doStreamLog(xf, param); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false" )); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + RestConstants.JOB_LOG_ACTION )); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM + "=1" )); + + param.clear(); + param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { RestConstants.JOB_LOG_ACTION }); + param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { "1-4,5" }); + out = doStreamLog(xf, param); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false" )); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + RestConstants.JOB_LOG_ACTION )); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM + "=1-4,5" )); + + param.clear(); + Date endDate = new Date(); + Date createdDate = new Date(endDate.getTime() / 2); + String date = DateUtils.formatDateOozieTZ(createdDate) + "::" + DateUtils.formatDateOozieTZ(endDate); + param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { RestConstants.JOB_LOG_DATE }); + param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { date }); + out = doStreamLog(xf, param); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false" )); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + RestConstants.JOB_LOG_DATE )); + assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM + "=" + date )); + + container.stop(); + } + finally { + if (dummyOozie != null) { + dummyOozie.teardown(); + } + container.stop(); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/65e2109d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 8759dad..8940746 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1689 HA support for OOZIE-7(Ability to view the log information corresponding to particular coordinator action) (puru via mona) OOZIE-1849 If the underlying job finishes while a Workflow is suspended, Oozie can take a while to realize it (rkanter) OOZIE-1835 NullPointerException from SLAEmailEventListener (rkanter) OOZIE-1809 offset and len options are ignored in oozie job -info for workflow (ryota)
