http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java new file mode 100644 index 0000000..85b72bd --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.gui; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.core.MediaType; + +import org.apache.giraph.debugger.mock.ComputationComputeTestGenerator; +import org.apache.giraph.debugger.mock.MasterComputeTestGenerator; +import org.apache.giraph.debugger.mock.TestGraphGenerator; +import org.apache.giraph.debugger.utils.DebuggerUtils; +import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace; +import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper; +import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper; +import org.apache.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONObject; + +import com.google.common.collect.Lists; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +/** + * Entry point to the HTTP Debugger Server. + */ +public class Server { + + /** + * Logger for the class. + */ + private static final Logger LOG = Logger.getLogger(Server.class); + /** + * Default port number for the server. + */ + private static final int SERVER_PORT = Integer.parseInt(System.getProperty( + "giraph.debugger.guiPort", "8000")); + + /** + * Private constructor to disallow construction outside of the class. + */ + private Server() { } + + /** + * @param args command line arguments for the server + * @throws Exception + */ + public static void main(String[] args) throws Exception { + HttpServer server = HttpServer + .create(new InetSocketAddress(SERVER_PORT), 0); + // Attach JobHandler instance to handle /job GET call. + server.createContext("/vertices", new GetVertices()); + server.createContext("/supersteps", new GetSupersteps()); + server.createContext("/scenario", new GetScenario()); + server.createContext("/integrity", new GetIntegrity()); + server.createContext("/test/vertex", new GetVertexTest()); + server.createContext("/test/master", new GetMasterTest()); + server.createContext("/test/graph", new GetTestGraph()); + server.createContext("/", new GetEditor()); + // Creates a default executor. + server.setExecutor(null); + server.start(); + } + + /** + * Handler when accessing the landing page for the server. + */ + static class GetEditor implements HttpHandler { + + @Override + public void handle(HttpExchange t) { + URI uri = t.getRequestURI(); + try { + try { + String path = uri.getPath(); + LOG.debug(path); + if (path.endsWith("/")) { + path += "index.html"; + } + path = path.replaceFirst("^/", ""); + LOG.debug("resource path to look for = " + path); + LOG.debug("resource URL = " + getClass().getResource(path)); + InputStream fs = getClass().getResourceAsStream(path); + if (fs == null) { + // Object does not exist or is not a file: reject + // with 404 error. + String response = "404 (Not Found)\n"; + t.sendResponseHeaders(404, response.length()); + OutputStream os = t.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } else { + // Object exists and is a file: accept with response + // code 200. + t.sendResponseHeaders(200, 0); + OutputStream os = t.getResponseBody(); + final byte[] buffer = new byte[0x10000]; + int count = 0; + while ((count = fs.read(buffer)) >= 0) { + os.write(buffer, 0, count); + } + fs.close(); + os.close(); + } + } catch (IOException e) { + e.printStackTrace(); + t.sendResponseHeaders(404, 0); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * Returns the list of vertices debugged in a given Superstep for a given job. + * + * URL parameters: {jobId, superstepId} + */ + static class GetVertices extends ServerHttpHandler { + @Override + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String jobId = paramMap.get(ServerUtils.JOB_ID_KEY); + String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY); + // CHECKSTYLE: stop IllegalCatch + try { + // jobId and superstepId are mandatory. Validate. + if (jobId == null || superstepId == null) { + throw new IllegalArgumentException("Missing mandatory params."); + } + List<String> vertexIds = null; + // May throw NumberFormatException. Handled below. + long superstepNo = Long.parseLong(superstepId); + if (superstepNo < -1) { + throw new NumberFormatException("Superstep must be integer >= -1."); + } + // May throw IOException. Handled below. + vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo, + DebugTrace.VERTEX_ALL); + this.statusCode = HttpURLConnection.HTTP_OK; + // Returns output as an array ["id1", "id2", "id3" .... ] + this.response = new JSONArray(vertexIds).toString(); + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s is a mandatory parameter.", + ServerUtils.JOB_ID_KEY)); + } + // CHECKSTYLE: resume IllegalCatch + } + } + + /** + * Returns the number of supersteps traced for the given job. + */ + static class GetSupersteps extends ServerHttpHandler { + @Override + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String jobId = paramMap.get(ServerUtils.JOB_ID_KEY); + // CHECKSTYLE: stop IllegalCatch + try { + // jobId and superstepId are mandatory. Validate. + if (jobId == null) { + throw new IllegalArgumentException("Missing mandatory params."); + } + List<Long> superstepIds = null; + // May throw IOException. Handled below. + superstepIds = ServerUtils.getSuperstepsDebugged(jobId); + this.statusCode = HttpURLConnection.HTTP_OK; + // Returns output as an array ["id1", "id2", "id3" .... ] + this.response = new JSONArray(superstepIds).toString(); + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s and %s are mandatory parameter.", + ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY)); + } + // CHECKSTYLE: resume IllegalCatch + } + } + + /** + * Returns the scenario for a given superstep of a given job. + * + * URL Params: {jobId, superstepId, [vertexId], [raw]} + * vertexId: vertexId is optional. It can be a single value or a comma + * separated list. If it is not supplied, returns the scenario for all + * vertices. If 'raw' parameter is specified, returns the raw protocol + * buffer. + */ + static class GetScenario extends ServerHttpHandler { + @Override + @SuppressWarnings("rawtypes") + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String jobId = paramMap.get(ServerUtils.JOB_ID_KEY); + String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY); + // Check both jobId and superstepId are present + // CHECKSTYLE: stop IllegalCatch + try { + if (jobId == null || superstepId == null) { + throw new IllegalArgumentException("Missing mandatory parameters"); + } + Long superstepNo = Long.parseLong(paramMap + .get(ServerUtils.SUPERSTEP_ID_KEY)); + if (superstepNo < -1) { + this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST; + this.response = String.format("%s must be an integer >= -1.", + ServerUtils.SUPERSTEP_ID_KEY); + return; + } + List<String> vertexIds = null; + // Get the single vertexId or the list of vertexIds (comma-separated). + String rawVertexIds = paramMap.get(ServerUtils.VERTEX_ID_KEY); + // No vertex Id supplied. Return scenario for all vertices. + if (rawVertexIds == null) { + // Read scenario for all vertices. + // May throw IOException. Handled below. + vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo, + DebugTrace.VERTEX_ALL); + } else { + // Split the vertices by comma. + vertexIds = Lists.newArrayList(rawVertexIds.split(",")); + } + // Send JSON by default. + JSONObject scenarioObj = new JSONObject(); + for (String vertexId : vertexIds) { + GiraphVertexScenarioWrapper giraphScenarioWrapper; + giraphScenarioWrapper = ServerUtils.readScenarioFromTrace(jobId, + superstepNo, vertexId.trim(), DebugTrace.VERTEX_REGULAR); + scenarioObj.put(vertexId, + ServerUtils.scenarioToJSON(giraphScenarioWrapper)); + } + // Set status as OK and convert JSONObject to string. + this.statusCode = HttpURLConnection.HTTP_OK; + this.response = scenarioObj.toString(); + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s and %s are mandatory parameter.", + ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY)); + } + // CHECKSTYLE: stop IllegalCatch + } + } + + /** + * Returns the JAVA code for vertex scenario. + * + * URL Params: {jobId, superstepId, vertexId, traceType} + * traceType: Can be one of reg, err, msg or vv + */ + static class GetVertexTest extends ServerHttpHandler { + @Override + @SuppressWarnings("rawtypes") + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String jobId = paramMap.get(ServerUtils.JOB_ID_KEY); + String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY); + String vertexId = paramMap.get(ServerUtils.VERTEX_ID_KEY); + String traceType = paramMap.get(ServerUtils.VERTEX_TEST_TRACE_TYPE_KEY); + // Check both jobId, superstepId and vertexId are present + try { + if (jobId == null || superstepId == null || vertexId == null || + traceType == null) { + throw new IllegalArgumentException("Missing mandatory parameters"); + } + Long superstepNo = Long.parseLong(paramMap + .get(ServerUtils.SUPERSTEP_ID_KEY)); + if (superstepNo < -1) { + throw new NumberFormatException(); + } + DebugTrace debugTrace = DebuggerUtils + .getVertexDebugTraceForPrefix(traceType); + // Send JSON by default. + GiraphVertexScenarioWrapper giraphScenarioWrapper = ServerUtils + .readScenarioFromTrace(jobId, superstepNo, vertexId.trim(), + debugTrace); + ComputationComputeTestGenerator testGenerator = + new ComputationComputeTestGenerator(); + String testClassName = String.format("%sTest_%s_S%s_V%s", + giraphScenarioWrapper.getVertexScenarioClassesWrapper() + .getClassUnderTest().getSimpleName(), jobId, superstepId, vertexId); + // Set the content-disposition header to force a download with the + // given filename. + String filename = String.format("%s.java", testClassName); + this.setResponseHeader("Content-Disposition", + String.format("attachment; filename=\"%s\"", filename)); + this.statusCode = HttpURLConnection.HTTP_OK; + this.responseContentType = MediaType.TEXT_PLAIN; + this.response = testGenerator + .generateTest(giraphScenarioWrapper, + null /* testPackage is optional */, testClassName); + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s, %s and %s are mandatory parameter.", + ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY, + ServerUtils.VERTEX_ID_KEY)); + } + } + } + + /** + * Returns the JAVA code for master scenario. + * + * @URLParams : {jobId, superstepId} + */ + static class GetMasterTest extends ServerHttpHandler { + @Override + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String jobId = paramMap.get(ServerUtils.JOB_ID_KEY); + String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY); + // Check both jobId, superstepId and vertexId are present + try { + if (jobId == null || superstepId == null) { + throw new IllegalArgumentException("Missing mandatory parameters"); + } + Long superstepNo = Long.parseLong(paramMap + .get(ServerUtils.SUPERSTEP_ID_KEY)); + if (superstepNo < -1) { + this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST; + this.response = String.format("%s must be an integer >= -1.", + ServerUtils.SUPERSTEP_ID_KEY); + return; + } + // Send JSON by default. + GiraphMasterScenarioWrapper giraphScenarioWrapper = ServerUtils + .readMasterScenarioFromTrace(jobId, superstepNo, + DebugTrace.MASTER_ALL); + MasterComputeTestGenerator masterTestGenerator = + new MasterComputeTestGenerator(); + // Set the content-disposition header to force a download with the + // given filename. + String testClassName = String.format("%sTest_%s_S%s", + giraphScenarioWrapper.getMasterClassUnderTest() + .replaceFirst(".*\\.", ""), jobId, superstepId); + String filename = String.format("%s.java", testClassName); + this.setResponseHeader("Content-Disposition", + String.format("attachment; filename=\"%s\"", filename)); + this.statusCode = HttpURLConnection.HTTP_OK; + this.responseContentType = MediaType.TEXT_PLAIN; + this.response = masterTestGenerator.generateTest(giraphScenarioWrapper, + null /* testPackage is optional */, testClassName); + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s and %s are mandatory parameter.", + ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY)); + } + } + } + + /** + * Returns the integrity violations based on the requested parameter. The + * requested parameter (type) may be one of M, E or V. + * + * URL Params: jobId, superstepId, violiationType It is an optional parameter + * and is only used when violationType = V + */ + static class GetIntegrity extends ServerHttpHandler { + /** + * The server returns only a limited number of msg or vertex value + * violations. For message violations, it may not put the limit at exactly + * this number because it reads each violation trace which may include + * multiple message violations and adds all the violations in the trace to + * the response. Once the total message violations is over this number it + * stops reading traces. + */ + private static final int NUM_VIOLATIONS_THRESHOLD = 50; + + @Override + @SuppressWarnings("rawtypes") + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String jobId = paramMap.get(ServerUtils.JOB_ID_KEY); + String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY); + String violationType = paramMap + .get(ServerUtils.INTEGRITY_VIOLATION_TYPE_KEY); + try { + if (jobId == null || superstepId == null || violationType == null) { + throw new IllegalArgumentException("Missing mandatory parameters"); + } + Long superstepNo = Long.parseLong(paramMap + .get(ServerUtils.SUPERSTEP_ID_KEY)); + if (superstepNo < -1) { + throw new NumberFormatException(); + } + // JSON object that will be finally returned. + JSONObject integrityObj = new JSONObject(); + // Message violation + if (violationType.equals("M")) { + List<String> taskIds = ServerUtils.getTasksWithIntegrityViolations( + jobId, superstepNo, DebugTrace.INTEGRITY_MESSAGE_ALL); + + int numViolations = 0; + for (String taskId : taskIds) { + MsgIntegrityViolationWrapper msgIntegrityViolationWrapper = + ServerUtils.readMsgIntegrityViolationFromTrace(jobId, taskId, + superstepNo); + integrityObj.put(taskId, + ServerUtils.msgIntegrityToJson(msgIntegrityViolationWrapper)); + numViolations += msgIntegrityViolationWrapper.numMsgWrappers(); + if (numViolations >= NUM_VIOLATIONS_THRESHOLD) { + break; + } + } + this.response = integrityObj.toString(); + this.statusCode = HttpURLConnection.HTTP_OK; + } else if (violationType.equals("V")) { + List<String> vertexIds = ServerUtils.getVerticesDebugged(jobId, + superstepNo, DebugTrace.INTEGRITY_VERTEX); + int numViolations = 0; + for (String vertexId : vertexIds) { + GiraphVertexScenarioWrapper giraphVertexScenarioWrapper = + ServerUtils.readVertexIntegrityViolationFromTrace(jobId, + superstepNo, vertexId); + numViolations++; + integrityObj.put(vertexId, + ServerUtils.vertexIntegrityToJson(giraphVertexScenarioWrapper)); + if (numViolations >= NUM_VIOLATIONS_THRESHOLD) { + break; + } + } + this.response = integrityObj.toString(); + this.statusCode = HttpURLConnection.HTTP_OK; + } else if (violationType.equals("E")) { + List<String> vertexIds = null; + // Get the single vertexId or the list of vertexIds (comma-separated). + String rawVertexIds = paramMap.get(ServerUtils.VERTEX_ID_KEY); + // No vertex Id supplied. Return exceptions for all vertices. + if (rawVertexIds == null) { + // Read exceptions for all vertices. + vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo, + DebugTrace.VERTEX_EXCEPTION); + } else { + // Split the vertices by comma. + vertexIds = Lists.newArrayList(rawVertexIds.split(",")); + } + // Send JSON by default. + JSONObject scenarioObj = new JSONObject(); + for (String vertexId : vertexIds) { + GiraphVertexScenarioWrapper giraphScenarioWrapper; + giraphScenarioWrapper = ServerUtils.readScenarioFromTrace(jobId, + superstepNo, vertexId.trim(), DebugTrace.VERTEX_EXCEPTION); + scenarioObj.put(vertexId, + ServerUtils.scenarioToJSON(giraphScenarioWrapper)); + } + // Set status as OK and convert JSONObject to string. + this.statusCode = HttpURLConnection.HTTP_OK; + this.response = scenarioObj.toString(); + } + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s, %s and %s are mandatory parameter.", + ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY, + ServerUtils.INTEGRITY_VIOLATION_TYPE_KEY)); + } + } + } + + /** + * Returns the TestGraph JAVA code. + * + * URL Param: adjList - Adjacency list of the graph + */ + static class GetTestGraph extends ServerHttpHandler { + @Override + public void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap) { + String adjList = paramMap.get(ServerUtils.ADJLIST_KEY); + // Check both jobId and superstepId are present + try { + if (adjList == null) { + throw new IllegalArgumentException("Missing mandatory parameters"); + } + TestGraphGenerator testGraphGenerator = new TestGraphGenerator(); + String testGraph = testGraphGenerator.generate(adjList.split("\n")); + this.setResponseHeader("Content-Disposition", + "attachment; filename=graph.java"); + this.statusCode = HttpURLConnection.HTTP_OK; + this.responseContentType = MediaType.TEXT_PLAIN; + this.response = testGraph; + } catch (Exception e) { + this.handleException(e, String.format( + "Invalid parameters. %s is mandatory parameter.", + ServerUtils.ADJLIST_KEY)); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java new file mode 100644 index 0000000..50ebb23 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.gui; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.util.Map; + +import javax.ws.rs.core.MediaType; + +import org.apache.log4j.Logger; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +/** + * The Abstract class for HTTP handlers. + */ +public abstract class ServerHttpHandler implements HttpHandler { + + /** + * Logger for this class. + */ + private static final Logger LOG = Logger.getLogger(ServerHttpHandler.class); + /** + * Response body. + */ + protected String response; + /** + * Response body as a byte array + */ + protected byte[] responseBytes; + /** + * Response status code. Please use HttpUrlConnection final static members. + */ + protected int statusCode; + /** + * MimeType of the response. Please use MediaType final static members. + */ + protected String responseContentType; + /** + * HttpExchange object received in the handle call. + */ + protected HttpExchange httpExchange; + + /** + * Handles an HTTP call's lifecycle - read parameters, process and send + * response. + * @param httpExchange the http exchange object. + */ + @Override + public void handle(HttpExchange httpExchange) throws IOException { + // Assign class members so that subsequent methods can use it. + this.httpExchange = httpExchange; + // Set application/json as the default content type. + this.responseContentType = MediaType.APPLICATION_JSON; + String rawUrl = httpExchange.getRequestURI().getQuery(); + Map<String, String> paramMap; + try { + paramMap = ServerUtils.getUrlParams(rawUrl); + // Call the method implemented by inherited classes. + LOG.info(httpExchange.getRequestURI().getPath() + paramMap.toString()); + processRequest(httpExchange, paramMap); + } catch (UnsupportedEncodingException ex) { + this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST; + this.response = "Malformed URL. Given encoding is not supported."; + } + // In case of an error statusCode, we just write the exception string. + // (Consider using JSON). + if (this.statusCode != HttpURLConnection.HTTP_OK) { + this.responseContentType = MediaType.TEXT_PLAIN; + } + // Set mandatory Response Headers. + this.setMandatoryResponseHeaders(); + this.writeResponse(); + } + + /** + * Writes the text response. + */ + private void writeResponse() throws IOException { + OutputStream os = this.httpExchange.getResponseBody(); + if (this.responseContentType == MediaType.APPLICATION_JSON || + this.responseContentType == MediaType.TEXT_PLAIN) { + this.httpExchange.sendResponseHeaders(this.statusCode, + this.response.length()); + os.write(this.response.getBytes()); + } else if (this.responseContentType == MediaType.APPLICATION_OCTET_STREAM) { + this.httpExchange.sendResponseHeaders(this.statusCode, + this.responseBytes.length); + os.write(this.responseBytes); + } + os.close(); + } + + /** + * Add mandatory headers to the HTTP response by the debugger server. MUST be + * called before sendResponseHeaders. + */ + private void setMandatoryResponseHeaders() { + // TODO(vikesh): **REMOVE CORS FOR ALL AFTER DECIDING THE DEPLOYMENT + // ENVIRONMENT** + Headers headers = this.httpExchange.getResponseHeaders(); + headers.add("Access-Control-Allow-Origin", "*"); + headers.add("Content-Type", this.responseContentType); + } + + /** + * Sets the given headerKey to the given headerValue. + * + * @param headerKey - Header Key + * @param headerValue - Header Value. + * @desc - For example, call like this to set the Content-disposition header + * setResponseHeader("Content-disposition", "attachment"); + */ + protected void setResponseHeader(String headerKey, String headerValue) { + Headers responseHeaders = this.httpExchange.getResponseHeaders(); + responseHeaders.add(headerKey, headerValue); + } + + /** + * Handle the common exceptions in processRequest. + * + * @param e thrown exception. + * @param illegalArgumentMessage - Message when illegal argument + * exception is thrown. Optional - May be null. + */ + protected void handleException(Exception e, String illegalArgumentMessage) { + e.printStackTrace(); + LOG.error(e); + if (e instanceof NumberFormatException) { + this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST; + this.response = String.format("%s must be an integer >= -1.", + ServerUtils.SUPERSTEP_ID_KEY); + } else if (e instanceof IllegalArgumentException) { + this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST; + this.response = illegalArgumentMessage; + } else if (e instanceof FileNotFoundException) { + this.statusCode = HttpURLConnection.HTTP_NOT_FOUND; + this.response = "File not found on the server. Please ensure this " + + "vertex/master was debugged."; + } else if (e instanceof IOException || + e instanceof InstantiationException || + e instanceof IllegalAccessException || + e instanceof ClassNotFoundException) { + this.statusCode = HttpURLConnection.HTTP_INTERNAL_ERROR; + this.response = "Internal Server Error."; + } else { + LOG.error("Unknown Exception: " + e.toString()); + this.statusCode = HttpURLConnection.HTTP_INTERNAL_ERROR; + this.response = "Unknown exception occured."; + } + } + + /** + * Implement this method in inherited classes. This method MUST set statusCode + * and response (or responseBytes) class members appropriately. In case the + * Content type is not JSON, must specify the new Content type. Default type + * is application/json. Non-200 Status is automatically assigned text/plain. + * + * @param httpExchange the http exchange object within which the paramters + * will be set. + * @param paramMap map of parameters. + */ + public abstract void processRequest(HttpExchange httpExchange, + Map<String, String> paramMap); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java new file mode 100644 index 0000000..d48d5ca --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.gui; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.io.IOUtils; +import org.apache.giraph.debugger.utils.AggregatedValueWrapper; +import org.apache.giraph.debugger.utils.DebuggerUtils; +import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace; +import org.apache.giraph.debugger.utils.ExceptionWrapper; +import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper.NeighborWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper.OutgoingMessageWrapper; +import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper; +import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper.ExtendedOutgoingMessageWrapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Utility methods for Debugger Server. + */ +@SuppressWarnings("rawtypes") +public class ServerUtils { + /** + * String for specifying the job id parameter. + */ + public static final String JOB_ID_KEY = "jobId"; + /** + * String for specifying the vertex id parameter. + */ + public static final String VERTEX_ID_KEY = "vertexId"; + /** + * String for specifying the superstep id parameter. + */ + public static final String SUPERSTEP_ID_KEY = "superstepId"; + /** + * String for specifying the type of integrity violation parameter. + */ + public static final String INTEGRITY_VIOLATION_TYPE_KEY = "type"; + /** + * String for specifying the task id. + */ + public static final String TASK_ID_KEY = "taskId"; + /** + * String for specifying the trace type, i.e., {@link DebugTrace}. + */ + public static final String VERTEX_TEST_TRACE_TYPE_KEY = "traceType"; + /** + * String for specifying the adjacency list parameter. + */ + public static final String ADJLIST_KEY = "adjList"; + + /** + * Logger for this class. + */ + private static final Logger LOG = Logger.getLogger(ServerUtils.class); + + /** + * Cached FileSystem opened by {@link #getFileSystem()}. + */ + private static FileSystem FILE_SYSTEM_CACHED; + + /** + * Private constructor to disallow construction. + */ + private ServerUtils() { } + + /** + * Returns parameters of the URL in a hash map. For instance, + * http://localhost:9000/?key1=val1&key2=val2&key3=val3. + * @param rawUrl url with the parameters attached, which will be parsed. + * @return the parameters on the url. + */ + public static Map<String, String> getUrlParams(String rawUrl) + throws UnsupportedEncodingException { + HashMap<String, String> paramMap = Maps.newHashMap(); + + if (rawUrl != null) { + String[] params = rawUrl.split("&"); + for (String param : params) { + String[] parts = param.split("="); + String paramKey = URLDecoder.decode(parts[0], "UTF-8"); + String paramValue = URLDecoder.decode(parts[1], "UTF-8"); + paramMap.put(paramKey, paramValue); + } + } + return paramMap; + } + + /** + * Returns the HDFS FileSystem reference. Note: We assume that the classpath + * contains the Hadoop's conf directory or the core-site.xml and hdfs-site.xml + * configuration directories. + * @return a {@link FileSystem} object to be used to read from HDFS. + */ + public static FileSystem getFileSystem() throws IOException { + if (FILE_SYSTEM_CACHED == null) { + Configuration configuration = new Configuration(); + FILE_SYSTEM_CACHED = FileSystem.get(configuration); + } + return FILE_SYSTEM_CACHED; + } + + /** + * @param jobId id of the job, whose jar path will be returned. + * @return a url wrapped inside an array for convenience. + */ + public static URL[] getCachedJobJarPath(String jobId) { + // read the jar signature file under the TRACE_ROOT/jobId/ + Path jarSignaturePath = new Path(DebuggerUtils.getTraceFileRoot(jobId) + + "/" + "jar.signature"); + try { + FileSystem fs = getFileSystem(); + try (FSDataInputStream jarSignatureInput = fs.open(jarSignaturePath)) { + List<String> lines = IOUtils.readLines(jarSignatureInput); + if (lines.size() > 0) { + String jarSignature = lines.get(0); + // check if jar is already in JARCACHE_LOCAL + File localFile = new File(DebuggerUtils.JARCACHE_LOCAL + "/" + + jarSignature + ".jar"); + if (!localFile.exists()) { + // otherwise, download from HDFS + Path hdfsPath = new Path(fs.getUri().resolve( + DebuggerUtils.JARCACHE_HDFS + "/" + jarSignature + ".jar")); + Logger.getLogger(ServerUtils.class).info( + "Copying from HDFS: " + hdfsPath + " to " + localFile); + localFile.getParentFile().mkdirs(); + fs.copyToLocalFile(hdfsPath, new Path(localFile.toURI())); + } + return new URL[] { localFile.toURI().toURL() }; + } + } + } catch (IOException e) { + // gracefully ignore if we failed to read the jar.signature + LOG.warn("An IOException is thrown but will be ignored: " + + e.toString()); + } + return new URL[0]; + } + + /** + * @param jobId id of the job. + * @param superstepNo superstep number. + * @param vertexId id of the vertex. + * @param debugTrace must be one of VERTEX_* or INTEGRITY_VERTEX types. + * @return path of the vertex trace file on HDFS. + */ + public static String getVertexTraceFilePath(String jobId, long superstepNo, + String vertexId, DebugTrace debugTrace) { + assert EnumSet.of(DebugTrace.VERTEX_EXCEPTION, DebugTrace.VERTEX_REGULAR, + DebugTrace.INTEGRITY_VERTEX).contains(debugTrace); + return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId), String + .format(DebuggerUtils.getTraceFileFormat(debugTrace), superstepNo, + vertexId)); + } + + /** + * @param jobId id of the job. + * @param taskId id of the task. + * @param superstepNo superstep number. + * @param debugTrace must be INTEGRITY_MESSAGE. + * @return path of the vertex trace file on HDFS. + */ + public static String getIntegrityTraceFilePath(String jobId, String taskId, + long superstepNo, DebugTrace debugTrace) { + assert EnumSet.of(DebugTrace.INTEGRITY_MESSAGE_ALL).contains(debugTrace); + return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId), + String.format(DebuggerUtils.getTraceFileFormat(debugTrace), taskId, + superstepNo)); + } + + /** + * @param jobId id of the job. + * @param superstepNo superstep number. + * @param debugTrace must be of type MASTER_*. + * @return path of the master compute trace file on HDFS. + */ + public static String getMasterTraceFilePath(String jobId, long superstepNo, + DebugTrace debugTrace) { + assert EnumSet.of(DebugTrace.MASTER_ALL, DebugTrace.MASTER_EXCEPTION, + DebugTrace.MASTER_REGULAR).contains(debugTrace); + return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId), + String.format(DebuggerUtils.getTraceFileFormat(debugTrace), superstepNo)); + } + + /** + * Reads the protocol buffer trace corresponding to the given jobId, + * superstepNo and vertexId and returns the giraphScenarioWrapper. + * + * @param jobId + * : ID of the job debugged. + * @param superstepNo + * : Superstep number debugged. + * @param vertexId + * - ID of the vertex debugged. Returns GiraphScenarioWrapper. + * @param debugTrace - Can be either any one of VERTEX_* and + * INTEGRITY_MESSAGE_SINGLE_VERTEX. + * @return the vertex scenario stored in the trace file represented as a + * {@link GiraphVertexScenarioWrapper} object. + */ + public static GiraphVertexScenarioWrapper readScenarioFromTrace(String jobId, + long superstepNo, String vertexId, DebugTrace debugTrace) + throws IOException, ClassNotFoundException, InstantiationException, + IllegalAccessException { + FileSystem fs = ServerUtils.getFileSystem(); + GiraphVertexScenarioWrapper giraphScenarioWrapper = + new GiraphVertexScenarioWrapper(); + EnumSet<DebugTrace> enumSet = EnumSet.of(debugTrace); + if (debugTrace == DebugTrace.VERTEX_ALL) { + enumSet = EnumSet.of(DebugTrace.VERTEX_REGULAR, + DebugTrace.VERTEX_EXCEPTION, DebugTrace.INTEGRITY_VERTEX, + DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX); + } + // Loops through all possible debug traces and returns the first one found. + for (DebugTrace enumValue : enumSet) { + String traceFilePath = ServerUtils.getVertexTraceFilePath(jobId, + superstepNo, vertexId, enumValue); + try { + // If scenario is found, return it. + giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath, + getCachedJobJarPath(jobId)); + return giraphScenarioWrapper; + } catch (FileNotFoundException e) { + // Ignore the exception since we will try reading another traceType + // again. + LOG.info("readScenarioFromTrace: File not found. Ignoring."); + } + } + // None of the debugTrace types were found. Throw exception. + throw new FileNotFoundException("Debug Trace not found."); + } + + /** + * Reads the master protocol buffer trace corresponding to the given jobId and + * superstepNo and returns the GiraphMasterScenarioWrapper object. + * + * @param jobId + * : ID of the job debugged. + * @param superstepNo + * : Superstep number debugged. + * @param debugTrace - Can be either MASTER_REGULAR, MASTER_EXCEPTION OR + * MASTER_ALL. In case of MASTER_ALL, returns whichever trace is + * available. + * @return the master scenario stored in the trace file represented as a + * {@link GiraphMasterScenarioWrapper} object. + */ + public static GiraphMasterScenarioWrapper readMasterScenarioFromTrace( + String jobId, long superstepNo, DebugTrace debugTrace) throws IOException, + ClassNotFoundException, InstantiationException, IllegalAccessException { + if (!EnumSet.of(DebugTrace.MASTER_ALL, DebugTrace.MASTER_EXCEPTION, + DebugTrace.MASTER_REGULAR).contains(debugTrace)) { + // Throw exception for unsupported debug trace. + throw new IllegalArgumentException( + "DebugTrace type is invalid. Use REGULAR, EXCEPTION or ALL_VERTICES"); + } + FileSystem fs = ServerUtils.getFileSystem(); + GiraphMasterScenarioWrapper giraphScenarioWrapper = + new GiraphMasterScenarioWrapper(); + // For each superstep, there is either a "regular" master trace (saved in + // master_reg_stp_i.tr files), or an "exception" master trace (saved in + // master_err_stp_i.tr files). We first check to see if a regular master + // trace is available. If not, then we check to see if an exception master + // trace is available. + if (debugTrace == DebugTrace.MASTER_REGULAR || + debugTrace == DebugTrace.MASTER_ALL) { + String traceFilePath = ServerUtils.getMasterTraceFilePath(jobId, + superstepNo, DebugTrace.MASTER_REGULAR); + try { + giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath, + getCachedJobJarPath(jobId)); + // If scenario is found, return it. + return giraphScenarioWrapper; + } catch (FileNotFoundException e) { + // If debugTrace was null, ignore this exception since + // we will try reading exception trace later. + if (debugTrace == DebugTrace.MASTER_ALL) { + LOG.info("readMasterScenarioFromTrace: Regular file not found. " + + "Ignoring."); + } else { + throw e; + } + } + } + // This code is reached only when debugTrace = exception or null. + // In case of null, it is only reached when regular trace is not found + // already. + String traceFilePath = ServerUtils.getMasterTraceFilePath(jobId, + superstepNo, DebugTrace.MASTER_EXCEPTION); + giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath, + getCachedJobJarPath(jobId)); + return giraphScenarioWrapper; + } + + /** + * @param jobId id of the job. + * @param taskId id of the task. + * @param superstepNo superstep number. + * @return the {@linke MsgIntegrityViolationWrapper} from trace file. + */ + public static MsgIntegrityViolationWrapper readMsgIntegrityViolationFromTrace( + String jobId, String taskId, long superstepNo) throws IOException, + ClassNotFoundException, InstantiationException, IllegalAccessException { + FileSystem fs = ServerUtils.getFileSystem(); + String traceFilePath = ServerUtils.getIntegrityTraceFilePath(jobId, taskId, + superstepNo, DebugTrace.INTEGRITY_MESSAGE_ALL); + MsgIntegrityViolationWrapper msgIntegrityViolationWrapper = + new MsgIntegrityViolationWrapper(); + msgIntegrityViolationWrapper.loadFromHDFS(fs, traceFilePath, + getCachedJobJarPath(jobId)); + return msgIntegrityViolationWrapper; + } + + /** + * @param jobId id of the job. + * @param superstepNo superstep number. + * @param vertexId id of the vertex. + * @return the vertex integrity data from the trace file stored inside + * {@link GiraphVertexScenarioWrapper}. + */ + public static GiraphVertexScenarioWrapper + readVertexIntegrityViolationFromTrace(String jobId, long superstepNo, + String vertexId) throws IOException, + ClassNotFoundException, InstantiationException, IllegalAccessException { + FileSystem fs = ServerUtils.getFileSystem(); + String traceFilePath = ServerUtils.getVertexTraceFilePath(jobId, + superstepNo, vertexId, DebugTrace.INTEGRITY_VERTEX); + GiraphVertexScenarioWrapper giraphScenarioWrapper = + new GiraphVertexScenarioWrapper(); + giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath); + return giraphScenarioWrapper; + } + + /** + * Converts a Giraph Scenario (giraphScenarioWrapper object) to JSON + * (JSONObject) + * + * @param giraphScenarioWrapper Giraph Scenario object. + * @return scenario data stored as json. + */ + public static JSONObject scenarioToJSON( + GiraphVertexScenarioWrapper giraphScenarioWrapper) throws JSONException { + VertexContextWrapper contextWrapper = giraphScenarioWrapper + .getContextWrapper(); + JSONObject scenarioObj = new JSONObject(); + scenarioObj.put("vertexId", contextWrapper.getVertexIdWrapper()); + scenarioObj.put("vertexValue", contextWrapper.getVertexValueAfterWrapper()); + JSONObject outgoingMessagesObj = new JSONObject(); + JSONArray neighborsList = new JSONArray(); + // Add outgoing messages. + for (Object outgoingMessage : contextWrapper.getOutgoingMessageWrappers()) { + OutgoingMessageWrapper outgoingMessageWrapper = + (OutgoingMessageWrapper) outgoingMessage; + outgoingMessagesObj.put(outgoingMessageWrapper.getDestinationId(). + toString(), outgoingMessageWrapper.getMessage().toString()); + } + // Add incoming messages. + ArrayList<String> incomingMessagesList = new ArrayList<String>(); + for (Object incomingMessage : contextWrapper.getIncomingMessageWrappers()) { + incomingMessagesList.add(incomingMessage.toString()); + } + // Add neighbors. + for (Object neighbor : contextWrapper.getNeighborWrappers()) { + JSONObject neighborObject = new JSONObject(); + NeighborWrapper neighborWrapper = (NeighborWrapper) neighbor; + neighborObject.put("neighborId", neighborWrapper.getNbrId()); + neighborObject.put("edgeValue", neighborWrapper.getEdgeValue()); + neighborsList.put(neighborObject); + } + scenarioObj.put("outgoingMessages", outgoingMessagesObj); + scenarioObj.put("incomingMessages", incomingMessagesList); + scenarioObj.put("neighbors", neighborsList); + // Add exception, if present. + if (giraphScenarioWrapper.hasExceptionWrapper()) { + JSONObject exceptionObj = new JSONObject(); + ExceptionWrapper exceptionWrapper = giraphScenarioWrapper + .getExceptionWrapper(); + exceptionObj.put("message", exceptionWrapper.getErrorMessage()); + exceptionObj.put("stackTrace", exceptionWrapper.getStackTrace()); + scenarioObj.put("exception", exceptionObj); + } + JSONObject aggregateObj = new JSONObject(); + for (Object aggregatedValue : contextWrapper + .getCommonVertexMasterContextWrapper().getPreviousAggregatedValues()) { + AggregatedValueWrapper aggregatedValueWrapper = + (AggregatedValueWrapper) aggregatedValue; + aggregateObj.put(aggregatedValueWrapper.getKey(), + aggregatedValueWrapper.getValue()); + } + scenarioObj.put("aggregators", aggregateObj); + return scenarioObj; + } + + /** + * Converts the message integrity violation wrapper to JSON. + * + * @param msgIntegrityViolationWrapper {@link MsgIntegrityViolationWrapper} + * object. + * @return message integrity violation data stored as json. + */ + public static JSONObject msgIntegrityToJson( + MsgIntegrityViolationWrapper msgIntegrityViolationWrapper) + throws JSONException { + JSONObject scenarioObj = new JSONObject(); + ArrayList<JSONObject> violationsList = new ArrayList<JSONObject>(); + scenarioObj.put("superstepId", + msgIntegrityViolationWrapper.getSuperstepNo()); + for (Object msgWrapper : msgIntegrityViolationWrapper + .getExtendedOutgoingMessageWrappers()) { + ExtendedOutgoingMessageWrapper extendedOutgoingMessageWrapper = + (ExtendedOutgoingMessageWrapper) msgWrapper; + JSONObject violationObj = new JSONObject(); + violationObj.put("srcId", extendedOutgoingMessageWrapper.getSrcId()); + violationObj.put("destinationId", + extendedOutgoingMessageWrapper.getDestinationId()); + violationObj.put("message", extendedOutgoingMessageWrapper.getMessage()); + violationsList.add(violationObj); + } + scenarioObj.put("violations", violationsList); + return scenarioObj; + } + + /** + * Converts the vertex integrity violation wrapper to JSON. + * + * @param giraphVertexScenarioWrapper {@link GiraphVertexScenarioWrapper} + * object storing the vertex value violation data. + * @return vertex integrity violation data stored as json. + */ + public static JSONObject vertexIntegrityToJson( + GiraphVertexScenarioWrapper giraphVertexScenarioWrapper) + throws JSONException { + JSONObject scenarioObj = new JSONObject(); + VertexContextWrapper vertexContextWrapper = giraphVertexScenarioWrapper + .getContextWrapper(); + scenarioObj.put("vertexId", vertexContextWrapper.getVertexIdWrapper()); + scenarioObj.put("vertexValue", + vertexContextWrapper.getVertexValueAfterWrapper()); + return scenarioObj; + } + + /** + * @param jobId id of the job. + * @param superstepNo superstep number. + * @param debugTrace type of vertex trace files. + * @return a list of vertex Ids that were debugged in the given superstep by + * reading (the file names of) the debug traces on HDFS. File names follow the + * <prefix>_stp_<superstepNo>_vid_<vertexId>.tr naming convention. + */ + public static List<String> getVerticesDebugged(String jobId, + long superstepNo, DebugTrace debugTrace) throws IOException { + ArrayList<String> vertexIds = new ArrayList<String>(); + FileSystem fs = ServerUtils.getFileSystem(); + String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId); + // Use this regex to match the file name and capture the vertex id. + String regex = String.format(DebuggerUtils.getTraceFileFormat(debugTrace), + superstepNo, "(.*?)"); + Pattern p = Pattern.compile(regex); + Path pt = new Path(traceFileRoot); + FileStatus[] fileStatuses = null; + // Hadoop listStatus returns null when path is not found. + fileStatuses = fs.listStatus(pt); + if (fileStatuses == null) { + throw new FileNotFoundException("Debug trace file not found."); + } + // Iterate through each file in this diFilerectory and match the regex. + for (FileStatus fileStatus : fileStatuses) { + String fileName = fileStatus.getPath().getName(); + Matcher m = p.matcher(fileName); + // Add this vertex id if there is a match. + if (m.find()) { + // VERTEX_ALL debug trace has one group to match the prefix -reg|err. + // FIXME XXX this is terrible: we pretend to know nothing about the + // patterns defined in DebuggerUtils#getTraceFileFormat(), but all of a + // sudden we're using inside knowledge to extract the vertex id part. :S + vertexIds.add(m.group(debugTrace == DebugTrace.VERTEX_ALL ? 2 : 1)); + } + } + return vertexIds; + } + + /** + * @param jobId id of the job. + * @param superstepNo superstep number. + * @param debugTrace must be one of INTEGRITY_* types. + * @return the IDs of all the tasks that caused the given integrity violation. + */ + public static List<String> getTasksWithIntegrityViolations(String jobId, + long superstepNo, DebugTrace debugTrace) throws IOException { + assert EnumSet.of(DebugTrace.INTEGRITY_MESSAGE_ALL, + DebugTrace.INTEGRITY_VERTEX).contains(debugTrace); + ArrayList<String> taskIds = new ArrayList<String>(); + FileSystem fs = ServerUtils.getFileSystem(); + String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId); + // Use this regex to match the file name and capture the vertex id. + String regex = String.format(DebuggerUtils.getTraceFileFormat(debugTrace), + "(.*?)", superstepNo); + Pattern p = Pattern.compile(regex); + Path pt = new Path(traceFileRoot); + FileStatus[] fileStatuses = null; + // Hadoop listStatus returns null when path is not found. + fileStatuses = fs.listStatus(pt); + if (fileStatuses == null) { + throw new FileNotFoundException("Debug trace file not found."); + } + // Iterate through each file in this directory and match the regex. + for (FileStatus fileStatus : fileStatuses) { + String fileName = fileStatus.getPath().getName(); + Matcher m = p.matcher(fileName); + // Add this vertex id if there is a match. + if (m.find()) { + taskIds.add(m.group(1)); + } + } + return taskIds; + } + + /** + * @param jobId id of the job. + * @return the list of supersteps for which there is an exception or regular + * trace. + */ + public static List<Long> getSuperstepsDebugged(String jobId) + throws IOException { + Set<Long> superstepIds = Sets.newHashSet(); + FileSystem fs = ServerUtils.getFileSystem(); + String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId); + // Use this regex to match the file name and capture the vertex id. + String regex = "(reg|err|msg_intgrty|vv_intgrty)_stp_(.*?)_vid_(.*?).tr$"; + Pattern p = Pattern.compile(regex); + Path pt = new Path(traceFileRoot); + // Iterate through each file in this directory and match the regex. + for (FileStatus fileStatus : fs.listStatus(pt)) { + String fileName = fileStatus.getPath().getName(); + Matcher m = p.matcher(fileName); + // Add this vertex id if there is a match. + if (m.find()) { + superstepIds.add(Long.parseLong(m.group(2))); + } + } + return Lists.newArrayList(superstepIds); + } + + /** + * @param jobId id of the job. + * @return the list of supersteps for which there is an exception or regular + * trace. + */ + public static List<Long> getSuperstepsMasterDebugged(String jobId) + throws IOException { + Set<Long> superstepIds = Sets.newHashSet(); + FileSystem fs = ServerUtils.getFileSystem(); + String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId); + // Use this regex to match the file name and capture the vertex id. + String regex = "master_.*_stp_(\\d+?).tr$"; + Pattern p = Pattern.compile(regex); + Path pt = new Path(traceFileRoot); + // Iterate through each file in this directory and match the regex. + for (FileStatus fileStatus : fs.listStatus(pt)) { + String fileName = fileStatus.getPath().getName(); + Matcher m = p.matcher(fileName); + // Add this vertex id if there is a match. + if (m.find()) { + superstepIds.add(Long.parseLong(m.group(1))); + } + } + return Lists.newArrayList(superstepIds); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java new file mode 100644 index 0000000..c3aaec0 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes to run Giraph debugger GUI. + */ +package org.apache.giraph.debugger.gui; http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java new file mode 100644 index 0000000..b6656fa --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java @@ -0,0 +1,650 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.instrumenter; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Type; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.debugger.DebugConfig; +import org.apache.giraph.debugger.utils.DebuggerUtils; +import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace; +import org.apache.giraph.debugger.utils.ExceptionWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper; +import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper; +import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.AbstractComputation; +import org.apache.giraph.graph.Computation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Class that intercepts call to the AbstractComputation's exposed methods for + * GiraphDebugger. + * + * @param <I> + * Vertex id + * @param <V> + * Vertex data + * @param <E> + * Edge data + * @param <M1> + * Incoming message type + * @param <M2> + * Outgoing message type + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public abstract class AbstractInterceptingComputation< + I extends WritableComparable, V extends Writable, E extends Writable, + M1 extends Writable, M2 extends Writable> + extends AbstractComputation<I, V, E, M1, M2> { + + + /** + * Configuration key for the class name of the class that extends DebugConfig. + */ + public static final String CONFIG_CLASS_KEY = "giraph.debugger.configClass"; + + /** + * Giraph configuration for specifying the DebugConfig class. + */ + public static final StrConfOption DEBUG_CONFIG_CLASS = new StrConfOption( + CONFIG_CLASS_KEY, DebugConfig.class.getName(), + "The name of the Debug Config class for the computation (e.g. " + + "org.apache.giraph.debugger.examples.SimpleShortestPathsDebugConfig)."); + + /** + * Logger for this class. + */ + protected static final Logger LOG = Logger + .getLogger(AbstractInterceptingComputation.class); + + /** + * A flag to indicate whether this Computation class was already initialized. + */ + protected static boolean IS_INITIALIZED; + /** + * Whether DEBUG_CONFIG tells to check message constraints. + */ + protected static boolean SHOULD_CHECK_MESSAGE_INTEGRITY; + /** + * Whether DEBUG_CONFIG tells to check vertex value constraints. + */ + protected static boolean SHOULD_CHECK_VERTEX_VALUE_INTEGRITY; + /** + * Whether DEBUG_CONFIG tells to catch exceptions. + */ + protected static boolean SHOULD_CATCH_EXCEPTIONS; + + /** + * Configuration key for the path to the jar signature. + */ + private static final String JAR_SIGNATURE_KEY = + "giraph.debugger.jarSignature"; + + /** + * A constant to limit the number of violations to log. + */ + private static int NUM_VIOLATIONS_TO_LOG = 5; + /** + * A constant to limit the number of vertices to log. + */ + private static int NUM_VERTICES_TO_LOG = 5; + /** + * A counter for number of vertices already logged. + */ + private static int NUM_VERTICES_LOGGED = 0; + /** + * A counter for number of vertex violations already logged. + */ + private static int NUM_VERTEX_VIOLATIONS_LOGGED = -1; + /** + * A counter for number of message violations already logged. + */ + private static int NUM_MESSAGE_VIOLATIONS_LOGGED = -1; + + /** + * DebugConfig instance to be used for debugging. + */ + private static DebugConfig DEBUG_CONFIG; + + /** + * The vertex id type as in the I of Giraph's Computation<I,V,E,M1,M2>. + */ + private static Type VERTEX_ID_CLASS; + /** + * The vertex value type as in the V of Giraph's Computation<I,V,E,M1,M2>. + */ + private static Type VERTEX_VALUE_CLASS; + /** + * The edge value type as in the E of Giraph's Computation<I,V,E,M1,M2>. + */ + private static Type EDGE_VALUE_CLASS; + /** + * The incoming message type as in the M1 of Giraph's + * Computation<I,V,E,M1,M2>. + */ + private static Type INCOMING_MESSAGE_CLASS; + /** + * The outgoing message type as in the M2 of Giraph's + * Computation<I,V,E,M1,M2>. + */ + private static Type OUTGOING_MESSAGE_CLASS; + + /** + * Contains previous aggregators that are available in the beginning of the + * superstep.In Giraph, these aggregators are immutable. NOTE: We currently + * only capture aggregators that are read by at least one vertex. If we want + * to capture all aggregators we need to change Giraph code to be get access + * to them. + */ + private static CommonVertexMasterInterceptionUtil + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL; + + /** + * Whether or not this vertex was configured to be debugged. If so we will + * intercept its outgoing messages. + */ + private boolean shouldDebugVertex; + /** + * Whether to stop intercepting compute() for the remaining vertices. + */ + private boolean shouldStopInterceptingVertex; + + /** + * For vertices that are configured to be debugged, we construct a + * GiraphVertexScenarioWrapper in the beginning and use it to intercept + * outgoing messages + */ + private GiraphVertexScenarioWrapper<I, V, E, M1, M2> + giraphVertexScenarioWrapperForRegularTraces; + + /** + * If a vertex has violated a message value constraint when it was sending a + * message we set this to true so that at the inside interceptComputeEnd() + * method we make sure we save a vertexScenario trace for it. + */ + private boolean hasViolatedMsgValueConstraint; + /** + * Stores the value of a vertex before the compute method is called. If a + * vertex throws an exception, or violates a vertex or message value + * constraint, then we use this value as the previous vertex value when we + * save a vertexScenario trace for it. + */ + private V previousVertexValue; + /** + * DataOutputBuffer for holding the previous vertex value. + */ + private DataOutputBuffer previousVertexValueOutputBuffer = + new DataOutputBuffer(); + /** + * DataInputBuffer for cloning what was preserved for previous vertex value. + */ + private DataInputBuffer previousVertexValueInputBuffer = + new DataInputBuffer(); + /** + * We keep the vertex under compute in case some functions need it, e.g., + * sendMessage(). + */ + private Vertex<I, V, E> currentVertexUnderCompute; + /** + * The wrapped instance of message integrity violation. + */ + private MsgIntegrityViolationWrapper<I, M2> msgIntegrityViolationWrapper; + + /** + * Provides a way to access the actual Computation class. + * @return The actual Computation class + */ + public abstract Class<? extends Computation<I, V, E, ? extends Writable, + ? extends Writable>> getActualTestedClass(); + + /** + * Initializes this class to start debugging. + */ + protected final synchronized void + initializeAbstractInterceptingComputation() { + if (IS_INITIALIZED) { + return; // don't initialize twice + } + IS_INITIALIZED = true; + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL = + new CommonVertexMasterInterceptionUtil( + getContext().getJobID().toString()); + String debugConfigClassName = DEBUG_CONFIG_CLASS.get(getConf()); + LOG.info("initializing debugConfigClass: " + debugConfigClassName); + Class<?> clazz; + try { + clazz = Class.forName(debugConfigClassName); + DEBUG_CONFIG = (DebugConfig<I, V, E, M1, M2>) clazz.newInstance(); + DEBUG_CONFIG.readConfig(getConf(), getTotalNumVertices(), + getContext().getJobID().getId()); + VERTEX_ID_CLASS = getConf().getVertexIdClass(); + VERTEX_VALUE_CLASS = getConf().getVertexValueClass(); + EDGE_VALUE_CLASS = getConf().getEdgeValueClass(); + INCOMING_MESSAGE_CLASS = getConf().getIncomingMessageValueClass(); + OUTGOING_MESSAGE_CLASS = getConf().getOutgoingMessageValueClass(); + // Set limits from DebugConfig + NUM_VERTICES_TO_LOG = DEBUG_CONFIG.getNumberOfVerticesToLog(); + NUM_VIOLATIONS_TO_LOG = DEBUG_CONFIG.getNumberOfViolationsToLog(); + // Reset counters + NUM_MESSAGE_VIOLATIONS_LOGGED = 0; + NUM_VERTEX_VIOLATIONS_LOGGED = 0; + NUM_VERTICES_LOGGED = 0; + // Cache DebugConfig flags + SHOULD_CATCH_EXCEPTIONS = DEBUG_CONFIG.shouldCatchExceptions(); + SHOULD_CHECK_VERTEX_VALUE_INTEGRITY = + DEBUG_CONFIG.shouldCheckVertexValueIntegrity(); + SHOULD_CHECK_MESSAGE_INTEGRITY = + DEBUG_CONFIG.shouldCheckMessageIntegrity(); + } catch (InstantiationException | ClassNotFoundException | + IllegalAccessException e) { + LOG.error("Could not create a new DebugConfig instance of " + + debugConfigClassName); + e.printStackTrace(); + throw new RuntimeException(e); + } + if (getWorkerContext().getMyWorkerIndex() == getWorkerContext() + .getWorkerCount() - 1) { + // last worker records jar signature if necessary + String jarSignature = getConf().get(JAR_SIGNATURE_KEY); + if (jarSignature != null) { + Path jarSignaturePath = new Path( + DebuggerUtils.getTraceFileRoot(COMMON_VERTEX_MASTER_INTERCEPTING_UTIL + .getJobId()) + "/" + "jar.signature"); + LOG.info("Recording jar signature (" + jarSignature + ") at " + + jarSignaturePath); + FileSystem fs = COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getFileSystem(); + try { + if (!fs.exists(jarSignaturePath)) { + OutputStream f = fs.create(jarSignaturePath, + true).getWrappedStream(); + IOUtils.write(jarSignature, f); + f.close(); + } + } catch (IOException e) { + // When multiple workers try to write the jar.signature, some of them + // may cause + // AlreadyBeingCreatedException to be thrown, which we ignore. + e.printStackTrace(); + } + } + } + LOG.info("done initializing debugConfigClass: " + debugConfigClassName); + } + + /** + * Keep the vertex value as the previous one. + * + * @param vertex the vertex + * @throws IOException + */ + private void keepPreviousVertexValue(Vertex<I, V, E> vertex) throws + IOException { + previousVertexValueOutputBuffer.reset(); + vertex.getValue().write(previousVertexValueOutputBuffer); + } + + /** + * Clone the kept previous vertex value. + * + * @return Copy of previous vertex value. + * Same instance will be reused across multiple calls. + * @throws IOException + */ + private V getPreviousVertexValue() throws IOException { + previousVertexValueInputBuffer.reset( + previousVertexValueOutputBuffer.getData(), + previousVertexValueOutputBuffer.getLength()); + if (previousVertexValue == null) { + previousVertexValue = getConf().createVertexValue(); + } + previousVertexValue.readFields(previousVertexValueInputBuffer); + return previousVertexValue; + } + + /** + * @return whether captured enough number of info for debugging. + */ + private boolean hasInterceptedEnough() { + return NUM_VERTICES_LOGGED >= NUM_VERTICES_TO_LOG || + NUM_VERTEX_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG || + NUM_MESSAGE_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG; + } + + /** + * Called before {@link Computation#preSuperstep()} to prepare a message + * integrity violation wrapper. + * @return true if compute() does not need to be intercepted for this + * superstep. + */ + protected final boolean interceptPreSuperstepBegin() { + // LOG.info("before preSuperstep"); + NUM_VERTICES_LOGGED = 0; + NUM_VERTEX_VIOLATIONS_LOGGED = 0; + NUM_MESSAGE_VIOLATIONS_LOGGED = 0; + if (!DEBUG_CONFIG.shouldDebugSuperstep(getSuperstep()) || + hasInterceptedEnough()) { + shouldStopInterceptingVertex = true; + return true; + } + if (SHOULD_CHECK_VERTEX_VALUE_INTEGRITY) { + LOG.info("creating a vertexValueViolationWrapper. superstepNo: " + + getSuperstep()); + } + + if (SHOULD_CHECK_MESSAGE_INTEGRITY) { + LOG.info("creating a msgIntegrityViolationWrapper. superstepNo: " + + getSuperstep()); + msgIntegrityViolationWrapper = new MsgIntegrityViolationWrapper<>( + (Class<I>) VERTEX_ID_CLASS, (Class<M2>) OUTGOING_MESSAGE_CLASS); + msgIntegrityViolationWrapper.setSuperstepNo(getSuperstep()); + } + + // LOG.info("before preSuperstep done"); + shouldStopInterceptingVertex = false; + return false; + } + + /** + * Called immediately when the compute() method is entered. Initializes data + * that will be required for debugging throughout the rest of the compute + * function. + * + * @param vertex The vertex that's about to be computed. + * @param messages The incoming messages for the vertex. + * @throws IOException + */ + protected final void interceptComputeBegin(Vertex<I, V, E> vertex, + Iterable<M1> messages) throws IOException { + if (!IS_INITIALIZED) { + // TODO: Sometimes Giraph doesn't call initialize() and directly calls + // compute(). Here we + // guard against things not being initiliazed, which was causing null + // pointer exceptions. + // Find out when/why this happens. + LOG.warn("interceptComputeBegin is called but debugConfig is null." + + " Initializing AbstractInterceptingComputation again..."); + initializeAbstractInterceptingComputation(); + } + // A vertex should be debugged if: + // 1) the user configures the superstep to be debugged; + // 2) the user configures the vertex to be debugged; and + // 3) we have already debugged less than a threshold of vertices in this + // superstep. + shouldDebugVertex = NUM_VERTICES_LOGGED < NUM_VERTICES_TO_LOG && + DEBUG_CONFIG.shouldDebugVertex(vertex, getSuperstep()); + if (shouldDebugVertex) { + giraphVertexScenarioWrapperForRegularTraces = getGiraphVertexScenario( + vertex, vertex.getValue(), messages); + } + // Keep a reference to the current vertex only when necessary. + if (SHOULD_CHECK_MESSAGE_INTEGRITY && + NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) { + currentVertexUnderCompute = vertex; + hasViolatedMsgValueConstraint = false; + } + // Keep the previous value only when necessary. + if (SHOULD_CATCH_EXCEPTIONS || + SHOULD_CHECK_VERTEX_VALUE_INTEGRITY && + NUM_VERTEX_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG || + SHOULD_CHECK_MESSAGE_INTEGRITY && + NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) { + keepPreviousVertexValue(vertex); + } + } + + /** + * Captures exception from {@link Computation#compute(Vertex, Iterable)}. + * + * @param vertex The vertex that was being computed. + * @param messages The incoming messages for the vertex. + * @param e The exception thrown. + * @throws IOException + */ + protected final void interceptComputeException(Vertex<I, V, E> vertex, + Iterable<M1> messages, Throwable e) throws IOException { + LOG.info("Caught an exception. message: " + e.getMessage() + + ". Saving a trace in HDFS."); + GiraphVertexScenarioWrapper<I, V, E, M1, M2> + giraphVertexScenarioWrapperForExceptionTrace = getGiraphVertexScenario( + vertex, getPreviousVertexValue(), messages); + ExceptionWrapper exceptionWrapper = new ExceptionWrapper(e.getMessage(), + ExceptionUtils.getStackTrace(e)); + giraphVertexScenarioWrapperForExceptionTrace + .setExceptionWrapper(exceptionWrapper); + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper( + giraphVertexScenarioWrapperForExceptionTrace, DebuggerUtils + .getFullTraceFileName(DebugTrace.VERTEX_EXCEPTION, + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), getSuperstep(), + vertex.getId().toString())); + } + + /** + * Called after {@link Computation#compute(Vertex, Iterable)} to check vertex + * and message value integrity. + * + * @param vertex The vertex that was computed. + * @param messages The incoming messages for the vertex. + * @return whether compute() needs to be intercepted more. + * @throws IOException + */ + protected final boolean interceptComputeEnd(Vertex<I, V, E> vertex, + Iterable<M1> messages) throws IOException { + if (shouldDebugVertex) { + // Reflect changes made by compute to scenario. + giraphVertexScenarioWrapperForRegularTraces.getContextWrapper() + .setVertexValueAfterWrapper(vertex.getValue()); + // Save vertex scenario. + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper( + giraphVertexScenarioWrapperForRegularTraces, DebuggerUtils + .getFullTraceFileName(DebugTrace.VERTEX_REGULAR, + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), getSuperstep(), + vertex.getId().toString())); + NUM_VERTICES_LOGGED++; + } + if (SHOULD_CHECK_VERTEX_VALUE_INTEGRITY && + NUM_VERTEX_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG && + !DEBUG_CONFIG.isVertexValueCorrect(vertex.getId(), vertex.getValue())) { + initAndSaveGiraphVertexScenarioWrapper(vertex, messages, + DebugTrace.INTEGRITY_VERTEX); + NUM_VERTEX_VIOLATIONS_LOGGED++; + } + if (hasViolatedMsgValueConstraint) { + initAndSaveGiraphVertexScenarioWrapper(vertex, messages, + DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX); + NUM_MESSAGE_VIOLATIONS_LOGGED++; + } + + shouldStopInterceptingVertex = hasInterceptedEnough(); + return shouldStopInterceptingVertex; + } + + /** + * Called after {@link Computation#postSuperstep()} to save the captured + * scenario. + */ + protected final void interceptPostSuperstepEnd() { + // LOG.info("after postSuperstep"); + if (SHOULD_CHECK_MESSAGE_INTEGRITY && + msgIntegrityViolationWrapper.numMsgWrappers() > 0) { + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper( + msgIntegrityViolationWrapper, DebuggerUtils + .getMessageIntegrityAllTraceFullFileName(getSuperstep(), + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), UUID.randomUUID() + .toString())); + } + // LOG.info("after postSuperstep done"); + } + + /** + * Saves the captured scenario for the given vertex. + * + * @param vertex The vertex that was computed. + * @param messages The incoming messages for the vertex. + * @param debugTrace The debug trace to save. + * @throws IOException + */ + private void initAndSaveGiraphVertexScenarioWrapper(Vertex<I, V, E> vertex, + Iterable<M1> messages, DebugTrace debugTrace) throws IOException { + GiraphVertexScenarioWrapper<I, V, E, M1, M2> + giraphVertexScenarioWrapper = getGiraphVertexScenario( + vertex, getPreviousVertexValue(), messages); + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper( + giraphVertexScenarioWrapper, DebuggerUtils.getFullTraceFileName( + debugTrace, COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), + getSuperstep(), vertex.getId().toString())); + } + + /** + * We pass the previous vertex value to assign as an argument because for some + * traces we capture the context lazily and store the previous value + * temporarily in an object. In those cases the previous value is not equal to + * the current value of the vertex. And sometimes it is equal to the current + * value. + * + * @param vertex The vertex the scenario will capture. + * @param previousVertexValueToAssign The previous vertex value. + * @param messages The incoming messages for this superstep. + * @return A scenario for the given vertex. + * @throws IOException + */ + private GiraphVertexScenarioWrapper<I, V, E, M1, M2> getGiraphVertexScenario( + Vertex<I, V, E> vertex, V previousVertexValueToAssign, + Iterable<M1> messages) throws IOException { + GiraphVertexScenarioWrapper<I, V, E, M1, M2> giraphVertexScenarioWrapper = + new GiraphVertexScenarioWrapper( + getActualTestedClass(), (Class<I>) VERTEX_ID_CLASS, + (Class<V>) VERTEX_VALUE_CLASS, (Class<E>) EDGE_VALUE_CLASS, + (Class<M1>) INCOMING_MESSAGE_CLASS, (Class<M2>) OUTGOING_MESSAGE_CLASS); + VertexContextWrapper contextWrapper = + giraphVertexScenarioWrapper.getContextWrapper(); + contextWrapper + .setVertexValueBeforeWrapper(previousVertexValueToAssign); + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.initCommonVertexMasterContextWrapper( + getConf(), getSuperstep(), getTotalNumVertices(), getTotalNumEdges()); + contextWrapper + .setCommonVertexMasterContextWrapper( + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL + .getCommonVertexMasterContextWrapper()); + giraphVertexScenarioWrapper.getContextWrapper().setVertexIdWrapper( + vertex.getId()); + Iterable<Edge<I, E>> returnVal = vertex.getEdges(); + for (Edge<I, E> edge : returnVal) { + giraphVertexScenarioWrapper.getContextWrapper().addNeighborWrapper( + edge.getTargetVertexId(), edge.getValue()); + } + for (M1 message : messages) { + giraphVertexScenarioWrapper.getContextWrapper() + .addIncomingMessageWrapper(message); + } + giraphVertexScenarioWrapper.getContextWrapper().setVertexValueAfterWrapper( + vertex.getValue()); + return giraphVertexScenarioWrapper; + } + + /** + * First intercepts the sent message if necessary and calls and then calls + * AbstractComputation's sendMessage method. + * + * @param id + * Vertex id to send the message to + * @param message + * Message data to send + */ + @Override + public void sendMessage(I id, M2 message) { + if (!shouldStopInterceptingVertex) { + if (shouldDebugVertex) { + giraphVertexScenarioWrapperForRegularTraces.getContextWrapper() + .addOutgoingMessageWrapper(id, message); + } + if (SHOULD_CHECK_MESSAGE_INTEGRITY && + NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) { + I senderId = currentVertexUnderCompute.getId(); + if (!DEBUG_CONFIG.isMessageCorrect(senderId, id, message, + getSuperstep())) { + msgIntegrityViolationWrapper.addMsgWrapper( + currentVertexUnderCompute.getId(), id, message); + NUM_MESSAGE_VIOLATIONS_LOGGED++; + hasViolatedMsgValueConstraint = true; + } + } + } + super.sendMessage(id, message); + } + + /** + * First intercepts the sent messages to all edges if necessary and calls and + * then calls AbstractComputation's sendMessageToAllEdges method. + * + * @param vertex + * Vertex whose edges to send the message to. + * @param message + * Message sent to all edges. + */ + @Override + public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) { + if (!shouldStopInterceptingVertex) { + if (shouldDebugVertex) { + for (Edge<I, E> edge : vertex.getEdges()) { + giraphVertexScenarioWrapperForRegularTraces.getContextWrapper() + .addOutgoingMessageWrapper(edge.getTargetVertexId(), message); + } + } + if (SHOULD_CHECK_MESSAGE_INTEGRITY) { + I senderId = vertex.getId(); + for (Edge<I, E> edge : vertex.getEdges()) { + if (NUM_MESSAGE_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG) { + break; + } + I id = edge.getTargetVertexId(); + if (DEBUG_CONFIG.isMessageCorrect(senderId, id, message, + getSuperstep())) { + continue; + } + msgIntegrityViolationWrapper.addMsgWrapper(senderId, id, message); + hasViolatedMsgValueConstraint = true; + NUM_MESSAGE_VIOLATIONS_LOGGED++; + } + } + } + super.sendMessageToAllEdges(vertex, message); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + A retVal = super.<A>getAggregatedValue(name); + if (!shouldStopInterceptingVertex) { + COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.addAggregatedValueIfNotExists(name, + retVal); + } + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java new file mode 100644 index 0000000..d6ed908 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.instrumenter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.giraph.debugger.utils.DebuggerUtils; +import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace; +import org.apache.giraph.debugger.utils.ExceptionWrapper; +import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper; +import org.apache.giraph.master.MasterCompute; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; + +/** + * Class that intercepts calls to {@link MasterCompute}'s exposed methods for + * GiraphDebugger. + */ +public abstract class AbstractInterceptingMasterCompute extends MasterCompute { + + /** + * Logger for this class. + */ + protected static final Logger LOG = Logger + .getLogger(AbstractInterceptingMasterCompute.class); + /** + * The master scenario being captured. + */ + private GiraphMasterScenarioWrapper giraphMasterScenarioWrapper; + /** + * The utility for intercepting master computes. + */ + private CommonVertexMasterInterceptionUtil commonVertexMasterInterceptionUtil; + + /** + * Called immediately as user's {@link MasterCompute#compute()} method is + * entered. + */ + public void interceptComputeBegin() { + LOG.info(this.getClass().getName() + ".interceptInitializeEnd is called "); + giraphMasterScenarioWrapper = new GiraphMasterScenarioWrapper(this + .getClass().getName()); + if (commonVertexMasterInterceptionUtil == null) { + commonVertexMasterInterceptionUtil = new + CommonVertexMasterInterceptionUtil(getContext().getJobID().toString()); + } + commonVertexMasterInterceptionUtil.initCommonVertexMasterContextWrapper( + getConf(), getSuperstep(), getTotalNumVertices(), getTotalNumEdges()); + giraphMasterScenarioWrapper + .setCommonVertexMasterContextWrapper(commonVertexMasterInterceptionUtil + .getCommonVertexMasterContextWrapper()); + } + + /** + * Intercepts the call to {@link MasterCompute#getAggregatedValue(String)} to + * capture aggregator values at each superstep. + * + * @param <A> + * The type of the aggregator value. + * @param name + * The name of the Giraph aggregator. + * @return The aggregator value returned by the original + * {@link MasterCompute#getAggregatedValue(String)}. + */ + @Intercept(renameTo = "getAggregatedValue") + public <A extends Writable> A getAggregatedValueIntercept(String name) { + A retVal = super.<A>getAggregatedValue(name); + commonVertexMasterInterceptionUtil.addAggregatedValueIfNotExists(name, + retVal); + return retVal; + } + + /** + * Called when user's {@link MasterCompute#compute()} method throws an + * exception. + * + * @param e + * exception thrown. + */ + protected final void interceptComputeException(Exception e) { + LOG.info("Caught an exception in user's MasterCompute. message: " + + e.getMessage() + ". Saving a trace in HDFS."); + ExceptionWrapper exceptionWrapper = new ExceptionWrapper(e.getMessage(), + ExceptionUtils.getStackTrace(e)); + giraphMasterScenarioWrapper.setExceptionWrapper(exceptionWrapper); + commonVertexMasterInterceptionUtil.saveScenarioWrapper( + giraphMasterScenarioWrapper, DebuggerUtils.getFullMasterTraceFileName( + DebugTrace.MASTER_EXCEPTION, + commonVertexMasterInterceptionUtil.getJobId(), getSuperstep())); + } + + /** + * Called after user's {@link MasterCompute#compute()} method returns. + */ + public void interceptComputeEnd() { + commonVertexMasterInterceptionUtil.saveScenarioWrapper( + giraphMasterScenarioWrapper, DebuggerUtils.getFullMasterTraceFileName( + DebugTrace.MASTER_REGULAR, + commonVertexMasterInterceptionUtil.getJobId(), getSuperstep())); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } +}
