Author: cdouglas
Date: Sun Jun 5 05:22:59 2011
New Revision: 1131737
URL: http://svn.apache.org/viewvc?rev=1131737&view=rev
Log:
MAPREDUCE-2529. Add support for regex-based shuffle metric counting
exceptions. Contributed by Thomas Graves
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131737&r1=1131736&r2=1131737&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sun Jun 5 05:22:59
2011
@@ -56,6 +56,9 @@ Release 0.20.205.0 - unreleased
MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
fail faulty maps more aggressively. (Thomas Graves via cdouglas)
+ MAPREDUCE-2529. Add support for regex-based shuffle metric counting
+ exceptions. (Thomas Graves via cdouglas)
+
Release 0.20.204.0 - unreleased
NEW FEATURES
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java?rev=1131737&r1=1131736&r2=1131737&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
Sun Jun 5 05:22:59 2011
@@ -37,6 +37,8 @@ class ShuffleServerInstrumentation imple
registry.newCounter("shuffle_failed_outputs", "", 0);
final MetricMutableCounterInt successOutputs =
registry.newCounter("shuffle_success_outputs", "", 0);
+ final MetricMutableCounterInt exceptionsCaught =
+ registry.newCounter("shuffle_exceptions_caught", "", 0);
ShuffleServerInstrumentation(TaskTracker tt) {
ttWorkerThreads = tt.workerThreads;
@@ -69,6 +71,12 @@ class ShuffleServerInstrumentation imple
successOutputs.incr();
}
+ //@Override
+ void exceptionsCaught() {
+ exceptionsCaught.incr();
+ }
+
+
@Override
public void getMetrics(MetricsBuilder builder, boolean all) {
MetricsRecordBuilder rb = builder.addRecord(registry.name());
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1131737&r1=1131736&r2=1131737&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sun Jun 5 05:22:59 2011
@@ -1468,6 +1468,15 @@ public class TaskTracker implements MRCo
server.setAttribute("log", LOG);
server.setAttribute("localDirAllocator", localDirAllocator);
server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+
+ String exceptionStackRegex =
+ conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
+ String exceptionMsgRegex =
+ conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+
+ server.setAttribute("exceptionStackRegex", exceptionStackRegex);
+ server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+
server.addInternalServlet("mapOutput", "/mapOutput",
MapOutputServlet.class);
server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
server.start();
@@ -3682,6 +3691,10 @@ public class TaskTracker implements MRCo
(ShuffleServerInstrumentation)
context.getAttribute("shuffleServerMetrics");
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");
+ String exceptionStackRegex =
+ (String) context.getAttribute("exceptionStackRegex");
+ String exceptionMsgRegex =
+ (String) context.getAttribute("exceptionMsgRegex");
verifyRequest(request, response, tracker, jobId);
@@ -3787,12 +3800,14 @@ public class TaskTracker implements MRCo
" from map: " + mapId + " given " + info.partLength + "/" +
info.rawLength);
}
+
} catch (IOException ie) {
Log log = (Log) context.getAttribute("log");
String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
") failed :\n"+
StringUtils.stringifyException(ie));
log.warn(errorMsg);
+ checkException(ie, exceptionMsgRegex, exceptionStackRegex,
shuffleMetrics);
if (isInputException) {
tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
}
@@ -3816,6 +3831,38 @@ public class TaskTracker implements MRCo
shuffleMetrics.successOutput();
}
+ protected void checkException(IOException ie, String exceptionMsgRegex,
+ String exceptionStackRegex, ShuffleServerInstrumentation
shuffleMetrics) {
+ // parse exception to see if it looks like a regular expression you
+ // configure. If both msgRegex and StackRegex set then make sure both
+ // match, otherwise only the one set has to match.
+ if (exceptionMsgRegex != null) {
+ String msg = ie.getMessage();
+ if (msg == null || !msg.matches(exceptionMsgRegex)) {
+ return;
+ }
+ }
+ if (exceptionStackRegex != null
+ && !checkStackException(ie, exceptionStackRegex)) {
+ return;
+ }
+ shuffleMetrics.exceptionsCaught();
+ }
+
+ private boolean checkStackException(IOException ie,
+ String exceptionStackRegex) {
+ StackTraceElement[] stack = ie.getStackTrace();
+
+ for (StackTraceElement elem : stack) {
+ String stacktrace = elem.toString();
+ if (stacktrace.matches(exceptionStackRegex)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
/**
* verify that request has correct HASH for the url
* and also add a field to reply header with hash of the HASH
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java?rev=1131737&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
(added)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
Sun Jun 5 05:22:59 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+
+import org.junit.Test;
+
+public class TestShuffleExceptionCount {
+
+ public static class TestMapOutputServlet extends
TaskTracker.MapOutputServlet {
+
+ public void checkException(IOException ie, String exceptionMsgRegex,
+ String exceptionStackRegex, ShuffleServerInstrumentation
shuffleMetrics) {
+ super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ }
+
+ }
+
+ @Test
+ public void testCheckException() throws IOException, InterruptedException {
+ TestMapOutputServlet testServlet = new TestMapOutputServlet();
+ JobConf conf = new JobConf();
+ conf.setUser("testuser");
+ conf.setJobName("testJob");
+ conf.setSessionId("testSession");
+
+ TaskTracker tt = new TaskTracker();
+ tt.setConf(conf);
+ ShuffleServerInstrumentation shuffleMetrics =
+ ShuffleServerInstrumentation.create(tt);
+
+ // first test with only MsgRegex set but doesn't match
+ String exceptionMsgRegex = "Broken pipe";
+ String exceptionStackRegex = null;
+ IOException ie = new IOException("EOFException");
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ MetricsRecordBuilder rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 0, rb);
+
+ // test with only MsgRegex set that does match
+ ie = new IOException("Broken pipe");
+ exceptionStackRegex = null;
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 1, rb);
+
+ // test with neither set, make sure incremented
+ exceptionMsgRegex = null;
+ exceptionStackRegex = null;
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 2, rb);
+
+ // test with only StackRegex set doesn't match
+ exceptionMsgRegex = null;
+ exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
+ ie.setStackTrace(constructStackTrace());
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 2, rb);
+
+ // test with only StackRegex set does match
+ exceptionMsgRegex = null;
+ exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 3, rb);
+
+ // test with both regex set and matches
+ exceptionMsgRegex = "Broken pipe";
+ ie.setStackTrace(constructStackTraceTwo());
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 4, rb);
+
+ // test with both regex set and only msg matches
+ exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 4, rb);
+
+ // test with both regex set and only stack matches
+ exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+ exceptionMsgRegex = "EOFException";
+ testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+ shuffleMetrics);
+ rb = getMetrics(shuffleMetrics);
+ assertCounter("shuffle_exceptions_caught", 4, rb);
+
+ }
+
+ /*
+ * Construction exception like:
+ * java.io.IOException: Broken pipe at
+ * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+ * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+ * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+ *
org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+ *
org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:709)
at
+ * org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192) at
+ *
org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
at
+ *
org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708) at
+ *
org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
+ */
+ private StackTraceElement[] constructStackTrace() {
+ StackTraceElement[] stack = new StackTraceElement[9];
+ stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
"interrupt", "", -2);
+ stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
"interrupt", "EPollArrayWrapper.java", 256);
+ stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup",
"EPollSelectorImpl.java", 175);
+ stack[3] = new
StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup",
"SelectorManager.java", 831);
+ stack[4] = new
StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "doSelect",
"SelectorManager.java", 709);
+ stack[5] = new StackTraceElement("org.mortbay.io.nio.SelectorManager",
"doSelect", "SelectorManager.java", 192);
+ stack[6] = new
StackTraceElement("org.mortbay.jetty.nio.SelectChannelConnector", "accept",
"SelectChannelConnector.java", 124);
+ stack[7] = new
StackTraceElement("org.mortbay.jetty.AbstractConnector$Acceptor", "run",
"AbstractConnector.java", 708);
+ stack[8] = new
StackTraceElement("org.mortbay.thread.QueuedThreadPool$PoolThread", "run",
"QueuedThreadPool.java", 582);
+
+ return stack;
+ }
+
+ /*
+ * java.io.IOException: Broken pipe at
+ * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+ * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+ * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+ *
org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+ *
org.mortbay.io.nio.SelectChannelEndPoint.updateKey(SelectChannelEndPoint.java:335)
at
+ *
org.mortbay.io.nio.SelectChannelEndPoint.blockWritable(SelectChannelEndPoint.java:278)
at
+ *
org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator.java:545)
at
+ *
org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:572) at
+ * org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012) at
+ *
org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:651)at
+ *
org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:580) at
+ */
+ private StackTraceElement[] constructStackTraceTwo() {
+ StackTraceElement[] stack = new StackTraceElement[11];
+ stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
"interrupt", "", -2);
+ stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
"interrupt", "EPollArrayWrapper.java", 256);
+ stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup",
"EPollSelectorImpl.java", 175);
+ stack[3] = new
StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup",
"SelectorManager.java", 831);
+ stack[4] = new
StackTraceElement("org.mortbay.io.nio.SelectChannelEndPoint", "updateKey",
"SelectChannelEndPoint.java", 335);
+ stack[5] = new
StackTraceElement("org.mortbay.io.nio.SelectChannelEndPoint", "blockWritable",
"SelectChannelEndPoint.java", 278);
+ stack[6] = new
StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output",
"blockForOutput", "AbstractGenerator.java", 545);
+ stack[7] = new
StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "flush",
"AbstractGenerator.java", 572);
+ stack[8] = new
StackTraceElement("org.mortbay.jetty.HttpConnection$Output", "flush",
"HttpConnection.java", 1012);
+ stack[9] = new
StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "write",
"AbstractGenerator.java", 651);
+ stack[10] = new
StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "write",
"AbstractGenerator.java", 580);
+
+ return stack;
+ }
+
+}