Modified: oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java Thu Aug 29 18:06:57 2013 @@ -53,9 +53,9 @@ public class TestLogStreamer extends XTe // between the start and end times of the job FileWriter fw1 = new FileWriter(getTestCaseDir() + "/oozie.log"); StringBuilder sb1 = new StringBuilder(); - sb1.append("2009-06-24 02:43:13,958 DEBUG _L1_:323 -" + logStatement + "End workflow state change"); - sb1.append("\n2009-06-24 02:43:13,961 INFO _L2_:317 -" + logStatement - + "[org.apache.oozie.core.command.WorkflowRunnerCallable] " + "released lock"); + sb1.append("2009-06-24 02:43:13,958 DEBUG _L1_:323 -" + logStatement + "End workflow state change\n"); + sb1.append("2009-06-24 02:43:13,961 INFO _L2_:317 -" + logStatement + + "[org.apache.oozie.core.command.WorkflowRunnerCallable] " + "released lock\n"); fw1.write(sb1.toString()); fw1.close(); File f1 = new File(getTestCaseDir() + "/oozie.log"); @@ -65,11 +65,11 @@ public class TestLogStreamer extends XTe // between the start and end times of the job FileWriter fw2 = new FileWriter(getTestCaseDir() + "/oozie.log.1"); StringBuilder sb2 = new StringBuilder(); - sb2.append("\n2009-06-24 02:43:13,986 WARN _L3_:539 -" + logStatement + "Use GenericOptionsParser for parsing " + "the " + sb2.append("2009-06-24 02:43:13,986 WARN _L3_:539 -" + logStatement + "Use GenericOptionsParser for parsing " + "the " + "arguments. " + "\n" + "_L3A_Applications " - + "should implement Tool for the same. \n_L3B_Multi line test"); - sb2.append("\n2009-06-24 02:43:14,431 INFO _L4_:661 -" + logStatement + "No job jar file set. User classes " - + "may not be found. " + "See JobConf(Class) or JobConf#setJar(String)."); + + "should implement Tool for the same. \n_L3B_Multi line test\n"); + sb2.append("2009-06-24 02:43:14,431 INFO _L4_:661 -" + logStatement + "No job jar file set. User classes " + + "may not be found. " + "See JobConf(Class) or JobConf#setJar(String).\n"); fw2.write(sb2.toString()); fw2.close(); File f2 = new File(getTestCaseDir() + "/oozie.log.1"); @@ -79,11 +79,11 @@ public class TestLogStreamer extends XTe // between the start and end times of the job FileWriter fw3 = new FileWriter(getTestCaseDir() + "/oozie.log.2"); StringBuilder sb3 = new StringBuilder(); - sb3.append("\n2009-06-24 02:43:14,505 INFO _L5_:317 - USER[oozie] GROUP[oozie] TOKEN[-] APP[-] JOB[-] " - + "ACTION[-] Released Lock"); - sb3.append("\n2009-06-24 02:43:19,344 DEBUG _L6_:323 - USER[oozie] GROUP[oozie] TOKEN[MYtoken] APP[-] JOB[-] " - + "ACTION[-] Number of pending signals to check [0]"); - sb3.append("\n2009-06-24 02:43:29,151 DEBUG _L7_:323 -" + logStatement + "Number of pending actions [0] "); + sb3.append("2009-06-24 02:43:14,505 INFO _L5_:317 - USER[oozie] GROUP[oozie] TOKEN[-] APP[-] JOB[-] " + + "ACTION[-] Released Lock\n"); + sb3.append("2009-06-24 02:43:19,344 DEBUG _L6_:323 - USER[oozie] GROUP[oozie] TOKEN[MYtoken] APP[-] JOB[-] " + + "ACTION[-] Number of pending signals to check [0]\n"); + sb3.append("2009-06-24 02:43:29,151 DEBUG _L7_:323 -" + logStatement + "Number of pending actions [0] \n"); fw3.write(sb3.toString()); fw3.close(); File f3 = new File(getTestCaseDir() + "/oozie.log.2"); @@ -94,9 +94,9 @@ public class TestLogStreamer extends XTe // "oozie.log" FileWriter fwerr = new FileWriter(getTestCaseDir() + "/testerr.log"); StringBuilder sberr = new StringBuilder(); - sberr.append("2009-06-24 02:43:13,958 WARN _L1_:323 -" + logStatement + "End workflow state change"); - sberr.append("\n2009-06-24 02:43:13,961 INFO _L2_:317 -" + logStatement - + "[org.apache.oozie.core.command.WorkflowRunnerCallable] " + "released lock"); + sberr.append("2009-06-24 02:43:13,958 WARN _L1_:323 -" + logStatement + "End workflow state change\n"); + sberr.append("2009-06-24 02:43:13,961 INFO _L2_:317 -" + logStatement + + "[org.apache.oozie.core.command.WorkflowRunnerCallable] " + "released lock\n"); fwerr.write(sberr.toString()); fwerr.close(); File ferr = new File(getTestCaseDir() + "/testerr.log"); @@ -109,9 +109,9 @@ public class TestLogStreamer extends XTe String outFilename = "oozie.log-" + filenameDateFormatter.format(new Date(currTime - 6 * 3600000)) + ".gz"; File f = new File(getTestCaseDir() + "/" + outFilename); StringBuilder sb = new StringBuilder(); - sb.append("\n2009-06-24 02:43:13,958 DEBUG _L8_:323 -" + logStatement + "End workflow state change"); - sb.append("\n2009-06-24 02:43:13,961 INFO _L9_:317 -" + logStatement + "[org.apache.oozie.core." - + "command.WorkflowRunnerCallable] " + "released lock"); + sb.append("2009-06-24 02:43:13,958 DEBUG _L8_:323 -" + logStatement + "End workflow state change\n"); + sb.append("2009-06-24 02:43:13,961 INFO _L9_:317 -" + logStatement + "[org.apache.oozie.core." + + "command.WorkflowRunnerCallable] " + "released lock\n"); writeToGZFile(f,sb); // oozie.log.gz GZip file would always be included in list of files for log retrieval @@ -119,9 +119,9 @@ public class TestLogStreamer extends XTe f = new File(getTestCaseDir() + "/" + outFilename); // Generate and write log content to the GZip file sb = new StringBuilder(); - sb.append("\n2009-06-24 02:43:13,958 DEBUG _L10_:323 -" + logStatement + "End workflow state change"); - sb.append("\n2009-06-24 02:43:13,961 INFO _L11_:317 -" + logStatement + "[org.apache.oozie.core." - + "command.WorkflowRunnerCallable] " + "released lock"); + sb.append("2009-06-24 02:43:13,958 DEBUG _L10_:323 -" + logStatement + "End workflow state change\n"); + sb.append("2009-06-24 02:43:13,961 INFO _L11_:317 -" + logStatement + "[org.apache.oozie.core." + + "command.WorkflowRunnerCallable] " + "released lock\n"); writeToGZFile(f,sb); // Test to check if an invalid GZip file(file name not in the expected format oozie.log-YYYY-MM-DD-HH.gz) is @@ -130,16 +130,16 @@ public class TestLogStreamer extends XTe f = new File(getTestCaseDir() + "/" + outFilename); // Generate and write log content to the GZip file sb = new StringBuilder(); - sb.append("\n2009-06-24 02:43:13,958 DEBUG _L12_:323 -" + logStatement + "End workflow state change"); - sb.append("\n2009-06-24 02:43:13,961 INFO _L13_:317 -" + logStatement + "[org.apache.oozie.core." - + "command.WorkflowRunnerCallable] " + "released lock"); + sb.append("2009-06-24 02:43:13,958 DEBUG _L12_:323 -" + logStatement + "End workflow state change\n"); + sb.append("2009-06-24 02:43:13,961 INFO _L13_:317 -" + logStatement + "[org.apache.oozie.core." + + "command.WorkflowRunnerCallable] " + "released lock\n"); writeToGZFile(f,sb); // Test for the log retrieval of the job that began 10 hours before and ended 5 hours before current time // respectively StringWriter sw = new StringWriter(); - XLogStreamer str = new XLogStreamer(xf, sw, getTestCaseDir(), "oozie.log", 1); - str.streamLog(new Date(currTime - 10 * 3600000), new Date(currTime - 5 * 3600000)); + XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1); + str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime - 5 * 3600000)); String[] out = sw.toString().split("\n"); // Check if the retrieved log content is of length seven lines after filtering based on time window, file name // pattern and parameters like JobId, Username etc. and/or based on log level like INFO, DEBUG, etc. @@ -156,8 +156,8 @@ public class TestLogStreamer extends XTe // Test to check if the null values for startTime and endTime are translated to 0 and current time respectively // and corresponding log content is retrieved properly StringWriter sw1 = new StringWriter(); - XLogStreamer str1 = new XLogStreamer(xf, sw1, getTestCaseDir(), "oozie.log", 1); - str1.streamLog(null, null); + XLogStreamer str1 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1); + str1.streamLog(sw1, null, null); out = sw1.toString().split("\n"); // Check if the retrieved log content is of length eight lines after filtering based on time window, file name // pattern and parameters like JobId, Username etc. and/or based on log level like INFO, DEBUG, etc. @@ -220,11 +220,11 @@ public class TestLogStreamer extends XTe // Test for the log retrieval of the job spanning multiple hours StringWriter sw2 = new StringWriter(); - XLogStreamer str2 = new XLogStreamer(xf, sw2, getTestCaseDir(), "oozie.log", 1); + XLogStreamer str2 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1); Calendar calendarEntry = Calendar.getInstance(); // Setting start-time to 2012-04-24-19 for log stream (month-1 passed as parameter since 0=January), and end time is current time calendarEntry.set(2012, 3, 24, 19, 0); - str2.streamLog(calendarEntry.getTime(), new Date(System.currentTimeMillis())); + str2.streamLog(sw2, calendarEntry.getTime(), new Date(System.currentTimeMillis())); String[] out = sw2.toString().split("\n"); // Check if the retrieved log content is of length five lines after filtering based on time window, file name @@ -272,8 +272,8 @@ public class TestLogStreamer extends XTe f1.setLastModified(currTime); StringWriter sw = new StringWriter(); - XLogStreamer str = new XLogStreamer(xf, sw, getTestCaseDir(), "oozie.log", 1); - str.streamLog(new Date(currTime - 5000), new Date(currTime + 5000)); + XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1); + str.streamLog(sw, new Date(currTime - 5000), new Date(currTime + 5000)); String[] out = sw.toString().split("\n"); // Check if the retrieved log content is of length five lines after filtering; we expect the first five lines because the // filtering shouldn't care whether or not there is a dash while the last five lines don't pass the normal filtering @@ -286,7 +286,7 @@ public class TestLogStreamer extends XTe assertEquals(true, out[4].contains("_L5_")); } - private void writeToGZFile(File f, StringBuilder sbr) throws IOException { + static void writeToGZFile(File f, StringBuilder sbr) throws IOException { GZIPOutputStream gzout = new GZIPOutputStream(new FileOutputStream(f)); String strg = sbr.toString(); // Write log content to the GZip file
Added: oozie/trunk/core/src/test/java/org/apache/oozie/util/TestMultiFileReader.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestMultiFileReader.java?rev=1518739&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestMultiFileReader.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestMultiFileReader.java Thu Aug 29 18:06:57 2013 @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + + +import java.io.File; +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.oozie.test.XTestCase; + +public class TestMultiFileReader extends XTestCase { + + public void testOneFile() throws Exception { + File dir = new File(getTestCaseDir()); + File f1 = new File(dir, "file1.txt"); + FileWriter fw = new FileWriter(f1); + String str1 = "This is the first line of the only file\nThis is the second line\n"; + fw.write(str1); + fw.close(); + ArrayList<File> files = new ArrayList<File>(); + files.add(f1); + + MultiFileReader reader = new MultiFileReader(files); + char[] buf = new char[str1.length()]; + Arrays.fill(buf, '+'); + int numRead = reader.read(buf, 0, 10); + assertEquals(10, numRead); + assertEquals(str1.substring(0, 10), new String(buf, 0, 10)); + for (int i = 10; i < buf.length; i++) { + assertEquals('+', buf[i]); + } + numRead = reader.read(buf, 10, str1.length() - 10); + assertEquals(str1.length() - 10, numRead); + assertEquals(str1, new String(buf)); + numRead = reader.read(); + assertEquals(-1, numRead); + } + + public void testMultipleFiles() throws Exception { + File dir = new File(getTestCaseDir()); + File f1 = new File(dir, "file1.txt"); + FileWriter fw = new FileWriter(f1); + String str1 = "This is the first line of the first file\nThis is the second line\n"; + fw.write(str1); + fw.close(); + File f2 = new File(dir, "file2.gz"); + String str2 = "This is a gz file with just one line\n"; + TestLogStreamer.writeToGZFile(f2, new StringBuilder(str2)); + File f3 = new File(dir, "file3.txt"); + fw = new FileWriter(f3); + String str3 = "And this is the last file\nwith\n3 lines\n"; + fw.write(str3); + fw.close(); + ArrayList<File> files = new ArrayList<File>(); + files.add(f1); + files.add(f2); + files.add(f3); + + MultiFileReader reader = new MultiFileReader(files); + char[] buf = new char[str1.length() + str2.length() + str3.length()]; + Arrays.fill(buf, '+'); + // Try reading longer than the first file; MultiFileReader doesn't read past it and will simply return less chars + // Reading again should get the next chars from the second file + int numRead = reader.read(buf, 0, str1.length() + 10); + assertEquals(str1.length(), numRead); + assertEquals(str1, new String(buf, 0, str1.length())); + for (int i = str1.length(); i < buf.length; i++) { + assertEquals('+', buf[i]); + } + numRead = reader.read(buf, str1.length(), str2.length()); + assertEquals(str2.length(), numRead); + assertEquals(str1 + str2, new String(buf, 0, str1.length() + str2.length())); + for (int i = str1.length() + str2.length(); i < buf.length; i++) { + assertEquals('+', buf[i]); + } + numRead = reader.read(buf, str1.length() + str2.length(), str3.length()); + assertEquals(str3.length(), numRead); + assertEquals(str1 + str2 + str3, new String(buf)); + numRead = reader.read(); + assertEquals(-1, numRead); + } +} Added: oozie/trunk/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java?rev=1518739&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java Thu Aug 29 18:06:57 2013 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.StringWriter; +import org.apache.oozie.test.XTestCase; + +public class TestSimplifiedTimestampedMessageParser extends XTestCase { + + public void testProcessRemainingLog() throws IOException { + XLogStreamer.Filter.reset(); + XLogStreamer.Filter.defineParameter("USER"); + XLogStreamer.Filter.defineParameter("GROUP"); + XLogStreamer.Filter.defineParameter("TOKEN"); + XLogStreamer.Filter.defineParameter("APP"); + XLogStreamer.Filter.defineParameter("JOB"); + XLogStreamer.Filter.defineParameter("ACTION"); + XLogStreamer.Filter xf = new XLogStreamer.Filter(); + xf.setParameter("JOB", "14-200904160239--example-forkjoinwf"); + xf.setLogLevel("DEBUG|WARN"); + + File file = TestTimestampedMessageParser.prepareFile1(getTestCaseDir()); + StringWriter sw = new StringWriter(); + new SimpleTimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw); + String[] out = sw.toString().split("\n"); + assertEquals(19, out.length); + assertTrue(out[0].contains("_L1_")); + assertTrue(out[1].contains("_L2_")); + assertTrue(out[2].contains("_L3_")); + assertTrue(out[3].contains("_L3A_")); + assertTrue(out[4].contains("_L3B_")); + assertTrue(out[5].contains("_L4_")); + assertTrue(out[6].contains("_L5_")); + assertTrue(out[7].contains("_L6_")); + assertTrue(out[8].contains("_L7_")); + assertTrue(out[9].contains("_L8_")); + assertTrue(out[10].contains("_L9_")); + assertTrue(out[11].contains("_L10_")); + assertTrue(out[12].contains("_L11_")); + assertTrue(out[13].contains("_L12_")); + assertTrue(out[14].contains("_L13_")); + assertTrue(out[15].contains("_L14_")); + assertTrue(out[16].contains("_L15_")); + assertTrue(out[17].contains("_L16_")); + assertTrue(out[18].contains("_L17_")); + } + + public void testProcessRemainingCoordinatorLogForActions() throws IOException { + XLogStreamer.Filter.reset(); + XLogStreamer.Filter.defineParameter("USER"); + XLogStreamer.Filter.defineParameter("GROUP"); + XLogStreamer.Filter.defineParameter("TOKEN"); + XLogStreamer.Filter.defineParameter("APP"); + XLogStreamer.Filter.defineParameter("JOB"); + XLogStreamer.Filter.defineParameter("ACTION"); + XLogStreamer.Filter xf = new XLogStreamer.Filter(); + xf.setParameter("JOB", "14-200904160239--example-C"); + xf.setParameter("ACTION", "14-200904160239--example-C@1"); + + File file = TestTimestampedMessageParser.prepareFile2(getTestCaseDir()); + StringWriter sw = new StringWriter(); + new SimpleTimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw); + String[] matches = sw.toString().split("\n"); + assertEquals(9, matches.length); + assertTrue(matches[0].contains("_L1_")); + assertTrue(matches[1].contains("_L2_")); + assertTrue(matches[2].contains("_L3_")); + assertTrue(matches[3].contains("_L3A_")); + assertTrue(matches[4].contains("_L3B_")); + assertTrue(matches[5].contains("_L4_")); + assertTrue(matches[6].contains("_L5_")); + assertTrue(matches[7].contains("_L6_")); + assertTrue(matches[8].contains("_L7_")); + } +} Added: oozie/trunk/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java?rev=1518739&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java Thu Aug 29 18:06:57 2013 @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import org.apache.oozie.test.XTestCase; + +public class TestTimestampedMessageParser extends XTestCase { + + static File prepareFile1(String dir) throws IOException { + File file = new File(dir + "/test1.log"); + FileWriter fw = new FileWriter(file); + StringBuilder sb = new StringBuilder(); + sb.append("2009-06-24 02:43:13,958 DEBUG _L1_:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-forkjoinwf] ACTION[-] End workflow state change"); + sb.append("\n2009-06-24 02:43:13,961 INFO _L2_:317 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-forkjoinwf] ACTION[-] " + + "[org.apache.oozie.core.command.WorkflowRunnerCallable] " + "released lock"); + sb.append("\n2009-06-24 02:43:13,986 WARN _L3_:539 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-forkjoinwf] ACTION[-] Use GenericOptionsParser for parsing " + + "the arguments. " + "\n_L3A_Applications should implement Tool for the same. \n_L3B_Multi line test"); + sb.append("\n2009-06-24 02:43:14,431 WARN _L4_:661 - No job jar file set. User classes may not be found. " + + "See JobConf(Class) or JobConf#setJar(String)."); + sb.append("\n2009-06-24 02:43:14,505 INFO _L5_:317 - USER[oozie] GROUP[oozie] TOKEN[-] APP[-] JOB[-] " + + "ACTION[-] " + "Released Lock"); + sb.append("\n2009-06-24 02:43:19,344 DEBUG _L6_:323 - USER[oozie] GROUP[oozie] TOKEN[MYtoken] APP[-] " + + "JOB[-] ACTION[-] Number of pending signals to check [0]"); + sb.append("\n2009-06-24 02:43:29,151 DEBUG _L7_:323 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] " + + "Number of pending actions [0] "); + // a multiline message with a stack trace + sb.append("\n2013-06-10 10:26:30,202 WARN ActionStartXCommand:542 - USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] " + + "JOB[14-200904160239--example-forkjoinwf] ACTION[14-200904160239--example-forkjoinwf@hive-node] Error starting " + + "action [hive-node]. ErrorType [TRANSIENT], ErrorCode [JA009], Message [JA009: java.io.IOException: Unknown " + + "protocol to name node: org.apache.hadoop.mapred.JobSubmissionProtocol _L8_\n" + + " at org.apache.hadoop.hdfs.server.namenode.NameNode.getProtocolVersion(NameNode.java:156) _L9_\n" + + " at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_L10_\n" + + " at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) _L11_\n" + + " at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426) _L12_\n" + + "] _L13_\n" + + "org.apache.oozie.action.ActionExecutorException: JA009: java.io.IOException: Unknown protocol to name node: " + + "org.apache.hadoop.mapred.JobSubmissionProtocol _L14_\n" + + " at org.apache.hadoop.hdfs.server.namenode.NameNode.getProtocolVersion(NameNode.java:156) _L15_\n" + + " at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) _L16_\n" + + " at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) _L17_\n"); + + fw.write(sb.toString()); + fw.close(); + return file; + } + + static File prepareFile2(String dir) throws IOException { + File file = new File(dir + "/test2.log"); + FileWriter fw = new FileWriter(file); + StringBuilder sb = new StringBuilder(); + sb.append("2009-06-24 02:43:13,958 DEBUG _L1_:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-C] ACTION[14-200904160239--example-C@1] End workflow state change"); + sb.append("\n2009-06-24 02:43:13,961 INFO _L2_:317 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-C] ACTION[14-200904160239--example-C@2] " + + "[org.apache.oozie.core.command.WorkflowRunnerCallable] released lock"); + sb.append("\n2009-06-24 02:43:13,986 WARN _L3_:539 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-C] ACTION[14-200904160239--example-C@2] Use GenericOptionsParser for " + + "parsing the arguments. \n_L3A_Applications should implement Tool for the same. \n_L3B_Multi line " + + "test"); + sb.append("\n2009-06-24 02:43:14,431 WARN _L4_:661 - No job jar file set. User classes may not be found. " + + "See JobConf(Class) or JobConf#setJar(String)."); + sb.append("\n2009-06-24 02:43:14,505 INFO _L5_:317 - USER[oozie] GROUP[oozie] TOKEN[-] APP[-] " + + "JOB[14-200904160239--example-C] ACTION[14-200904160239--example-C@1] Released Lock"); + sb.append("\n2009-06-24 02:43:19,344 DEBUG _L6_:323 - USER[oozie] GROUP[oozie] TOKEN[MYtoken] APP[-] " + + "JOB[-] ACTION[-] Number of pending signals to check [0]"); + sb.append("\n2009-06-24 02:43:29,151 DEBUG _L7_:323 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] " + + "ACTION[-] Number of pending actions [0] "); + + fw.write(sb.toString()); + fw.close(); + return file; + } + + public void testProcessRemainingLog() throws IOException { + XLogStreamer.Filter.reset(); + XLogStreamer.Filter.defineParameter("USER"); + XLogStreamer.Filter.defineParameter("GROUP"); + XLogStreamer.Filter.defineParameter("TOKEN"); + XLogStreamer.Filter.defineParameter("APP"); + XLogStreamer.Filter.defineParameter("JOB"); + XLogStreamer.Filter.defineParameter("ACTION"); + XLogStreamer.Filter xf = new XLogStreamer.Filter(); + xf.setParameter("JOB", "14-200904160239--example-forkjoinwf"); + xf.setLogLevel("DEBUG|WARN"); + + File file = prepareFile1(getTestCaseDir()); + StringWriter sw = new StringWriter(); + new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw); + String[] out = sw.toString().split("\n"); + assertEquals(14, out.length); + assertTrue(out[0].contains("_L1_")); + assertTrue(out[1].contains("_L3_")); + assertTrue(out[2].contains("_L3A_")); + assertTrue(out[3].contains("_L3B_")); + assertTrue(out[4].contains("_L8_")); + assertTrue(out[5].contains("_L9_")); + assertTrue(out[6].contains("_L10_")); + assertTrue(out[7].contains("_L11_")); + assertTrue(out[8].contains("_L12_")); + assertTrue(out[9].contains("_L13_")); + assertTrue(out[10].contains("_L14_")); + assertTrue(out[11].contains("_L15_")); + assertTrue(out[12].contains("_L16_")); + assertTrue(out[13].contains("_L17_")); + } + + public void testProcessRemainingCoordinatorLogForActions() throws IOException { + XLogStreamer.Filter.reset(); + XLogStreamer.Filter.defineParameter("USER"); + XLogStreamer.Filter.defineParameter("GROUP"); + XLogStreamer.Filter.defineParameter("TOKEN"); + XLogStreamer.Filter.defineParameter("APP"); + XLogStreamer.Filter.defineParameter("JOB"); + XLogStreamer.Filter.defineParameter("ACTION"); + XLogStreamer.Filter xf = new XLogStreamer.Filter(); + xf.setParameter("JOB", "14-200904160239--example-C"); + xf.setParameter("ACTION", "14-200904160239--example-C@1"); + + File file = prepareFile2(getTestCaseDir()); + StringWriter sw = new StringWriter(); + new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw); + String[] matches = sw.toString().split("\n"); + assertEquals(2, matches.length); + assertTrue(matches[0].contains("_L1_")); + assertTrue(matches[1].contains("_L5_")); + } + + public void testLifecycle() throws Exception { + XLogStreamer.Filter.reset(); + XLogStreamer.Filter xf = new XLogStreamer.Filter(); + String str1 = "2009-06-24 02:43:13,958 DEBUG _L1_:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-forkjoinwf] ACTION[-] End workflow state change\n"; + String str2 = "2009-06-24 02:43:13,961 INFO _L2_:317 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] " + + "JOB[14-200904160239--example-forkjoinwf] ACTION[-]\n"; + BufferedReader reader = new BufferedReader(new StringReader(str1 + str2)); + TimestampedMessageParser parser = new TimestampedMessageParser(reader, xf); + assertNull(parser.getLastMessage()); + assertNull(parser.getLastTimestamp()); + assertTrue(parser.increment()); + assertEquals(str1, parser.getLastMessage()); + assertEquals("2009-06-24 02:43:13,958", parser.getLastTimestamp()); + assertTrue(parser.increment()); + assertEquals(str2, parser.getLastMessage()); + assertEquals("2009-06-24 02:43:13,961", parser.getLastTimestamp()); + assertFalse(parser.increment()); + assertEquals(str2, parser.getLastMessage()); + assertEquals("2009-06-24 02:43:13,961", parser.getLastTimestamp()); + parser.closeReader(); + } +} Modified: oozie/trunk/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java Thu Aug 29 18:06:57 2013 @@ -33,7 +33,8 @@ public class TestXLogFilter extends XTes XLogStreamer.Filter xf2 = new XLogStreamer.Filter(); xf2.constructPattern(); ArrayList<String> a = new ArrayList<String>(); - a.add("2009-06-24 02:43:13,958 DEBUG"); + a.add("2009-06-24 02:43:13,958"); + a.add(" DEBUG"); a.add(" WorkflowRunnerCallable:323 - " + XLog.Info.get().createPrefix() + " test log"); assertEquals(true, xf2.matches(a)); } Added: oozie/trunk/core/src/test/java/org/apache/oozie/util/TestZKUtils.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestZKUtils.java?rev=1518739&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestZKUtils.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestZKUtils.java Thu Aug 29 18:06:57 2013 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + +import java.util.List; +import java.util.Map; +import static junit.framework.Assert.assertEquals; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.oozie.test.ZKXTestCase; + + +public class TestZKUtils extends ZKXTestCase { + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + private class DummyUser { + private ZKUtils zk = null; + void register() throws Exception { + zk = ZKUtils.register(this); + sleep(1000); // Sleep to allow ZKUtils ServiceCache to update + } + + void unregister() { + if (zk != null) { + zk.unregister(this); + sleep(1000); // Sleep to allow ZKUtils ServiceCache to update + } + zk = null; + } + + ZKUtils getZKUtils() { + return zk; + } + } + + public void testRegisterAdvertiseUnadvertiseUnregister() throws Exception { + CuratorFramework client = getClient(); + ServiceDiscovery<Map> sDiscovery = getServiceDiscovery(); + + assertNull(client.checkExists().forPath("/services")); + assertEquals(0, sDiscovery.queryForInstances("servers").size()); + assertNull(sDiscovery.queryForInstance("servers", ZK_ID)); + DummyUser du = new DummyUser(); + DummyUser du2 = new DummyUser(); + try { + du.register(); + assertNotNull(client.checkExists().forPath("/services")); + assertEquals(1, sDiscovery.queryForInstances("servers").size()); + assertNotNull(sDiscovery.queryForInstance("servers", ZK_ID)); + du2.register(); + assertNotNull(client.checkExists().forPath("/services")); + assertEquals(1, sDiscovery.queryForInstances("servers").size()); + assertNotNull(sDiscovery.queryForInstance("servers", ZK_ID)); + + du.unregister(); + assertNotNull(client.checkExists().forPath("/services")); + assertEquals(1, sDiscovery.queryForInstances("servers").size()); + assertNotNull(sDiscovery.queryForInstance("servers", ZK_ID)); + du2.unregister(); + assertNotNull(client.checkExists().forPath("/services")); + assertEquals(0, sDiscovery.queryForInstances("servers").size()); + assertNull(sDiscovery.queryForInstance("servers", ZK_ID)); + } + finally { + du.unregister(); + du2.unregister(); + } + } + + public void testMetaData() throws Exception { + ServiceDiscovery<Map> sDiscovery = getServiceDiscovery(); + + assertNull(sDiscovery.queryForInstance("servers", ZK_ID)); + DummyUser du = new DummyUser(); + try { + du.register(); + assertNotNull(sDiscovery.queryForInstance("servers", ZK_ID)); + List<ServiceInstance<Map>> allMetaData = du.getZKUtils().getAllMetaData(); + assertEquals(1, allMetaData.size()); + ServiceInstance<Map> meta = allMetaData.get(0); + assertEquals(ZK_ID, meta.getId()); + assertEquals("servers", meta.getName()); + Map<String, String> data = meta.getPayload(); + assertEquals(3, data.size()); + assertEquals(ZK_ID, data.get("OOZIE_ID")); + String url = ConfigUtils.getOozieURL(false); + assertEquals(url, data.get("OOZIE_URL")); + assertEquals("java.util.HashMap", data.get("@class")); + } + finally { + du.unregister(); + } + } + + public void testGetZKId() throws Exception { + DummyUser du = new DummyUser(); + try { + du.register(); + assertEquals(ZK_ID, du.getZKUtils().getZKId()); + } + finally { + du.unregister(); + } + } + + public void testGetZKIdIndex() throws Exception { + DummyUser du = new DummyUser(); + DummyZKOozie dummyOozie = null; + DummyZKOozie dummyOozie2 = null; + try { + dummyOozie = new DummyZKOozie("a", "http://blah"); + du.register(); + assertEquals(1, du.getZKUtils().getZKIdIndex(du.getZKUtils().getAllMetaData())); + dummyOozie2 = new DummyZKOozie("b", "http://blah"); + assertEquals(1, du.getZKUtils().getZKIdIndex(du.getZKUtils().getAllMetaData())); + dummyOozie.teardown(); + assertEquals(0, du.getZKUtils().getZKIdIndex(du.getZKUtils().getAllMetaData())); + } + finally { + du.unregister(); + if (dummyOozie != null) { + dummyOozie.teardown(); + } + if (dummyOozie2 != null) { + dummyOozie2.teardown(); + } + } + } +} Modified: oozie/trunk/distro/src/main/tomcat/ssl-web.xml URL: http://svn.apache.org/viewvc/oozie/trunk/distro/src/main/tomcat/ssl-web.xml?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/distro/src/main/tomcat/ssl-web.xml (original) +++ oozie/trunk/distro/src/main/tomcat/ssl-web.xml Thu Aug 29 18:06:57 2013 @@ -45,4 +45,11 @@ </user-data-constraint> </security-constraint> + <!-- Property used by Oozie to determine that SSL (HTTPS) has been enabled --> + <!-- Do not remove or change this --> + <context-param> + <param-name>ssl.enabled</param-name> + <param-value>true</param-value> + </context-param> + </web-app> Modified: oozie/trunk/docs/src/site/twiki/AG_Install.twiki URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/AG_Install.twiki?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/docs/src/site/twiki/AG_Install.twiki (original) +++ oozie/trunk/docs/src/site/twiki/AG_Install.twiki Thu Aug 29 18:06:57 2013 @@ -693,6 +693,101 @@ similar. You will probably have to add y Refer to the [[./oozie-default.xml][oozie-default.xml]] for details. +#HA +---+++ High Availability (HA) + +Multiple Oozie Servers can be configured against the same database to provide High Availability (HA) of the Oozie service. + +---++++ Pre-requisites + +1. A database that supports multiple concurrent connections. In order to have full HA, the database should also have HA support, or +it becomes a single point of failure. + +*NOTE:* The default derby database does not support this + +2. A ZooKeeper ensemble. + +Apache ZooKeeper is a distributed, open-source coordination service for distributed applications; the Oozie servers use it for +coordinating access to the database and communicating with each other. In order to have full HA, there should be at least 3 +ZooKeeper servers. +More information on ZooKeeper can be found [[http://zookeeper.apache.org][here]]. + +3. Multiple Oozie servers. + +*IMPORTANT:* While not strictly required for all configuration properties, all of the servers should ideally have exactly the same +configuration for consistency's sake. + +4. A Loadbalancer, Virtual IP, or Round-Robin DNS. + +This is used to provide a single entry-point for users and for callbacks from the JobTracker. The load balancer should be +configured for round-robin between the Oozie servers to distribute the requests. Users (using either the Oozie client, a web +browser, or the REST API) should connect through the load balancer. In order to have full HA, the load balancer should also have +HA support, or it becomes a single point of failure. + +---++++ Installation/Configuration Steps + +1. Install identically configured Oozie servers normally. Make sure they are all configured against the same database and make sure +that you DO NOT start them yet. + +2. Add the following services to the extension services configuration property in oozie-site.xml in all Oozie servers. This will +make Oozie use the ZooKeeper versions of these services instead of the default implementations. + +<verbatim> +<property> + <name>oozie.services.ext</name> + <value> + org.apache.oozie.service.ZKLocksService, + org.apache.oozie.service.ZKXLogStreamingService, + org.apache.oozie.service.ZKJobsConcurrencyService + </value> +</property> +</verbatim> + +3. Add the following property to oozie-site.xml in all Oozie servers. It should be a comma-separated list of host:port pairs of the +ZooKeeper servers. The default value is shown below. + +<verbatim> +<property> + <name>oozie.zookeeper.connection.string</name> + <value>localhost:2181</value> +</property> +</verbatim> + +4. (Optional) Add the following properties to oozie-site.xml in all Oozie servers. + +The namespace to use. All of the Oozie Servers that are planning on talking to each other should have the same namespace. If there +are multiple Oozie setups each doing their own HA, they should have their own namespace. The default value is shown below. + +<verbatim> +<property> + <name>oozie.zookeeper.namespace</name> + <value>oozie</value> +</property> +</verbatim> + +The ID for the Oozie server to use when talking to ZooKeeper and other Oozie servers. Each Oozie server must have a unique ID. If +blank, Oozie will use the hostname. + +<verbatim> +<property> + <name>oozie.zookeeper.oozie.id</name> + <value> </value> +</property> +</verbatim> + +5. Change the value of OOZIE_BASE_URL in oozie-env.sh to point to the loadbalancer or virtual IP, for example: + +<verbatim> +export OOZIE_BASE_URL="http://my.loadbalancer.hostname:11000/oozie" +</verbatim> + +6. Start the ZooKeeper servers. + +7. Start the Oozie servers. + +Note: If one of the Oozie servers becomes unavailable, querying Oozie for the logs from a job in the Web UI, REST API, or client may +be missing information until that server comes back up. + ---++ Starting and Stopping Oozie Use the standard Tomcat commands to start and stop Oozie. Modified: oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki (original) +++ oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki Thu Aug 29 18:06:57 2013 @@ -75,6 +75,7 @@ usage: -doas <arg> doAs user, impersonates as the specified user. -oozie <arg> Oozie URL -queuedump show Oozie server queue elements + -servers list available Oozie servers (more than one only if HA is enabled) -status show the current system status -systemmode <arg> Supported in Oozie-2.0 or later versions ONLY. Change oozie system mode [NORMAL|NOWEBSERVICE|SAFEMODE] @@ -781,6 +782,19 @@ $ oozie admin -oozie http://localhost:11 It returns the Oozie server current queued commands. +---+++ Displaying the list of available Oozie Servers + +Example: + +<verbatim> +$ oozie admin http://localhost:11000/oozie -servers +OozieA : http://localhost:11000/oozie +OozieB : http://localhost:12000/oozie +OozieC : http://localhost:13000/oozie +</verbatim> + +It returns a list of available Oozie Servers. This is useful when Oozie is configured for [[AG_Install#HA][High Availability]]; if +not, it will simply return the one Oozie Server. ---++ Validate Operations Modified: oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki (original) +++ oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki Thu Aug 29 18:06:57 2013 @@ -357,6 +357,30 @@ A HTTP GET request returns the queue dum GET /oozie/v1/admin/queue-dump </verbatim> +---++++ Available Oozie Servers + +A HTTP GET request returns the list of available Oozie Servers. This is useful when Oozie is configured +for [[AG_Install#HA][High Availability]]; if not, it will simply return the one Oozie Server. + +*Request:* + +<verbatim> +GET /oozie/v2/admin/available-oozie-servers +</verbatim> + +*Response:* + +<verbatim> +HTTP/1.1 200 OK +Content-Type: application/json;charset=UTF-8 +. +{ + "OozieA": "http://localhost:11000/oozie", + "OozieB": "http://localhost:12000/oozie", + "OozieC": "http://localhost:13000/oozie", +} +</verbatim> + ---+++ Job and Jobs End-Points _Modified in Oozie v1 WS API_ Modified: oozie/trunk/pom.xml URL: http://svn.apache.org/viewvc/oozie/trunk/pom.xml?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/pom.xml (original) +++ oozie/trunk/pom.xml Thu Aug 29 18:06:57 2013 @@ -699,6 +699,24 @@ <version>1.8.5</version> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>2.2.0-incubating</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <version>2.2.0-incubating</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>2.2.0-incubating</version> + </dependency> + <!-- examples --> <dependency> <groupId>commons-httpclient</groupId> Modified: oozie/trunk/release-log.txt URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1518739&r1=1518738&r2=1518739&view=diff ============================================================================== --- oozie/trunk/release-log.txt (original) +++ oozie/trunk/release-log.txt Thu Aug 29 18:06:57 2013 @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-615 Support high availability for the Oozie service (rkanter) OOZIE-1486 Cut down on number of small files created to track a running action (mona) OOZIE-1476 Add ability to issue kill on Coordinator Action directly with id and nominal daterange (mona) OOZIE-1495 inconsistent behavior of chmod/chgrp when path doesn't exist after glob support (ryota)
