Modified: incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java?rev=1432294&r1=1432293&r2=1432294&view=diff ============================================================================== --- incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java (original) +++ incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java Fri Jan 11 21:35:03 2013 @@ -23,7 +23,12 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -127,7 +132,7 @@ public class MapReduceJobHistoryUpdater workflowSelectPS = connection.prepareStatement( - "SELECT workflowId FROM " + WORKFLOW_TABLE + " where workflowId = ?" + "SELECT workflowContext FROM " + WORKFLOW_TABLE + " where workflowId = ?" ); workflowPS = @@ -154,6 +159,8 @@ public class MapReduceJobHistoryUpdater "UPDATE " + WORKFLOW_TABLE + " SET " + + "workflowContext = ?, " + + "numJobsTotal = ?, " + "lastUpdateTime = ?, " + "duration = ? - (SELECT startTime FROM " + WORKFLOW_TABLE + @@ -597,6 +604,57 @@ public class MapReduceJobHistoryUpdater return context; } + public static void mergeEntries(Map<String, Set<String>> edges, List<WorkflowDagEntry> entries) { + if (entries == null) + return; + for (WorkflowDagEntry entry : entries) { + if (!edges.containsKey(entry.getSource())) + edges.put(entry.getSource(), new TreeSet<String>()); + Set<String> targets = edges.get(entry.getSource()); + targets.addAll(entry.getTargets()); + } + } + + public static WorkflowDag constructMergedDag(WorkflowContext workflowContext, WorkflowContext existingWorkflowContext) { + Map<String, Set<String>> edges = new TreeMap<String, Set<String>>(); + if (existingWorkflowContext.getWorkflowDag() != null) + mergeEntries(edges, existingWorkflowContext.getWorkflowDag().getEntries()); + if (workflowContext.getWorkflowDag() != null) + mergeEntries(edges, workflowContext.getWorkflowDag().getEntries()); + WorkflowDag mergedDag = new WorkflowDag(); + for (Entry<String,Set<String>> edge : edges.entrySet()) { + WorkflowDagEntry entry = new WorkflowDagEntry(); + entry.setSource(edge.getKey()); + entry.getTargets().addAll(edge.getValue()); + mergedDag.addEntry(entry); + } + return mergedDag; + } + + private static WorkflowContext getSanitizedWorkflow(WorkflowContext workflowContext, WorkflowContext existingWorkflowContext) { + WorkflowContext sanitizedWC = new WorkflowContext(); + if (existingWorkflowContext == null) { + sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag()); + sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext()); + } else { + sanitizedWC.setWorkflowDag(constructMergedDag(existingWorkflowContext, workflowContext)); + sanitizedWC.setParentWorkflowContext(existingWorkflowContext.getParentWorkflowContext()); + } + return sanitizedWC; + } + + private static String getWorkflowString(WorkflowContext sanitizedWC) { + String sanitizedWCString = null; + try { + ObjectMapper om = new ObjectMapper(); + sanitizedWCString = om.writeValueAsString(sanitizedWC); + } catch (IOException e) { + e.printStackTrace(); + sanitizedWCString = ""; + } + return sanitizedWCString; + } + private void processJobSubmittedEvent( PreparedStatement jobPS, PreparedStatement workflowSelectPS, PreparedStatement workflowPS, @@ -616,35 +674,35 @@ public class MapReduceJobHistoryUpdater // Get workflow information boolean insertWorkflow = false; + String existingContextString = null; + ResultSet rs = null; try { workflowSelectPS.setString(1, workflowContext.getWorkflowId()); workflowSelectPS.execute(); - ResultSet rs = workflowSelectPS.getResultSet(); - insertWorkflow = !rs.next(); + rs = workflowSelectPS.getResultSet(); + if (rs.next()) { + existingContextString = rs.getString(1); + } else { + insertWorkflow = true; + } } catch (SQLException sqle) { LOG.warn("workflow select failed with: ", sqle); insertWorkflow = false; + } finally { + try { + if (rs != null) + rs.close(); + } catch (SQLException e) { + LOG.error("Exception while closing ResultSet", e); + } } // Insert workflow if (insertWorkflow) { - WorkflowContext sanitizedWC = new WorkflowContext(); - sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag()); - sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext()); - - String sanitizedWCString = null; - try { - ObjectMapper om = new ObjectMapper(); - sanitizedWCString = om.writeValueAsString(sanitizedWC); - } catch (IOException e) { - e.printStackTrace(); - sanitizedWCString = ""; - } - workflowPS.setString(1, workflowContext.getWorkflowId()); workflowPS.setString(2, workflowContext.getWorkflowName()); - workflowPS.setString(3, sanitizedWCString); + workflowPS.setString(3, getWorkflowString(getSanitizedWorkflow(workflowContext, null))); workflowPS.setString(4, historyEvent.getUserName()); workflowPS.setLong(5, historyEvent.getSubmitTime()); workflowPS.setLong(6, historyEvent.getSubmitTime()); @@ -653,10 +711,22 @@ public class MapReduceJobHistoryUpdater LOG.debug("Successfully inserted workflowId = " + workflowContext.getWorkflowId()); } else { - workflowUpdateTimePS.setLong(1, historyEvent.getSubmitTime()); - workflowUpdateTimePS.setLong(2, historyEvent.getSubmitTime()); - workflowUpdateTimePS.setString(3, workflowContext.getWorkflowId()); - workflowUpdateTimePS.setString(4, workflowContext.getWorkflowId()); + ObjectMapper om = new ObjectMapper(); + WorkflowContext existingWorkflowContext = null; + try { + if (existingContextString != null) + existingWorkflowContext = om.readValue(existingContextString.getBytes(), WorkflowContext.class); + } catch (IOException e) { + LOG.warn("Couldn't read existing workflow context for " + workflowContext.getWorkflowId(), e); + } + + WorkflowContext sanitizedWC = getSanitizedWorkflow(workflowContext, existingWorkflowContext); + workflowUpdateTimePS.setString(1, getWorkflowString(sanitizedWC)); + workflowUpdateTimePS.setLong(2, sanitizedWC.getWorkflowDag().size()); + workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime()); + workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime()); + workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId()); + workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId()); workflowUpdateTimePS.executeUpdate(); LOG.debug("Successfully updated workflowId = " + workflowContext.getWorkflowId());
Modified: incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java?rev=1432294&r1=1432293&r2=1432294&view=diff ============================================================================== --- incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java (original) +++ incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java Fri Jan 11 21:35:03 2013 @@ -30,6 +30,7 @@ import org.apache.ambari.eventdb.model.W import org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobHistory; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.tools.rumen.JobSubmittedEvent; import org.apache.hadoop.util.StringUtils; @@ -65,20 +66,42 @@ public class TestJobHistoryParsing exten test("id_= 0-1", "something.name", "1=0", adj); } + public void test3() { + String s = "`~!@#$%^&*()-_=+[]{}|,.<>/?;:'\""; + test(s, s, s, new HashMap<String,String[]>()); + } + + public void test4() { + Map<String,String[]> adj = new HashMap<String,String[]>(); + adj.put("X", new String[] {}); + test("", "jobName", "X", adj); + } + public void test(String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adjacencies) { Configuration conf = new Configuration(); setProperties(conf, workflowId, workflowName, workflowNodeName, adjacencies); String log = log("JOB", new String[] {ID, NAME, NODE, ADJ}, new String[] {conf.get(ID_PROP), conf.get(NAME_PROP), conf.get(NODE_PROP), JobHistory.JobInfo.getWorkflowAdjacencies(conf)}); ParsedLine line = new ParsedLine(log); - JobSubmittedEvent event = new JobSubmittedEvent(null, "", "", 0l, "", null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ)); + JobID jobid = new JobID("id", 1); + JobSubmittedEvent event = new JobSubmittedEvent(jobid, workflowName, "", 0l, "", null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ)); WorkflowContext context = MapReduceJobHistoryUpdater.buildWorkflowContext(event); - assertEquals("Didn't recover workflowId", workflowId, context.getWorkflowId()); + + String resultingWorkflowId = workflowId; + if (workflowId.isEmpty()) + resultingWorkflowId = jobid.toString().replace("job_", "mr_"); + assertEquals("Didn't recover workflowId", resultingWorkflowId, context.getWorkflowId()); assertEquals("Didn't recover workflowName", workflowName, context.getWorkflowName()); assertEquals("Didn't recover workflowNodeName", workflowNodeName, context.getWorkflowEntityName()); - assertEquals("Got incorrect number of adjacencies", adjacencies.size(), context.getWorkflowDag().getEntries().size()); + + Map<String,String[]> resultingAdjacencies = adjacencies; + if (resultingAdjacencies.size() == 0) { + resultingAdjacencies = new HashMap<String,String[]>(); + resultingAdjacencies.put(workflowNodeName, new String[] {}); + } + assertEquals("Got incorrect number of adjacencies", resultingAdjacencies.size(), context.getWorkflowDag().getEntries().size()); for (WorkflowDagEntry entry : context.getWorkflowDag().getEntries()) { - String[] sTargets = adjacencies.get(entry.getSource()); + String[] sTargets = resultingAdjacencies.get(entry.getSource()); assertNotNull("No original targets for " + entry.getSource(), sTargets); List<String> dTargets = entry.getTargets(); assertEquals("Got incorrect number of targets for " + entry.getSource(), sTargets.length, dTargets.size()); Added: incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java?rev=1432294&view=auto ============================================================================== --- incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java (added) +++ incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java Fri Jan 11 21:35:03 2013 @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari; + +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.ambari.eventdb.model.WorkflowContext; +import org.apache.ambari.eventdb.model.WorkflowDag; +import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry; +import org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater; + +/** + * + */ +public class TestMapReduceJobHistoryUpdater extends TestCase { + public void testDagMerging() { + WorkflowDag dag1 = new WorkflowDag(); + dag1.addEntry(getEntry("a", "b", "c")); + dag1.addEntry(getEntry("b", "d")); + WorkflowContext one = new WorkflowContext(); + one.setWorkflowDag(dag1); + + WorkflowDag dag2 = new WorkflowDag(); + dag2.addEntry(getEntry("a", "d")); + dag2.addEntry(getEntry("c", "e")); + WorkflowContext two = new WorkflowContext(); + two.setWorkflowDag(dag2); + + WorkflowDag emptyDag = new WorkflowDag(); + WorkflowContext three = new WorkflowContext(); + three.setWorkflowDag(emptyDag); + + WorkflowDag mergedDag = new WorkflowDag(); + mergedDag.addEntry(getEntry("a", "b", "c", "d")); + mergedDag.addEntry(getEntry("b", "d")); + mergedDag.addEntry(getEntry("c", "e")); + + assertEquals(mergedDag, MapReduceJobHistoryUpdater.constructMergedDag(one, two)); + assertEquals(mergedDag, MapReduceJobHistoryUpdater.constructMergedDag(two, one)); + + // test blank dag + assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(three, one)); + assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(one, three)); + assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(three, two)); + assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(two, three)); + + // test null dag + assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(new WorkflowContext(), one)); + assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(one, new WorkflowContext())); + assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(new WorkflowContext(), two)); + assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(two, new WorkflowContext())); + + // test same dag + assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(one, one)); + assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(two, two)); + assertEquals(emptyDag, MapReduceJobHistoryUpdater.constructMergedDag(three, three)); + } + + private static WorkflowDagEntry getEntry(String source, String... targets) { + WorkflowDagEntry entry = new WorkflowDagEntry(); + entry.setSource(source); + for (String target : targets) { + entry.addTarget(target); + } + return entry; + } + + private static void assertEquals(WorkflowDag dag1, WorkflowDag dag2) { + assertEquals(dag1.size(), dag2.size()); + List<WorkflowDagEntry> entries1 = dag1.getEntries(); + List<WorkflowDagEntry> entries2 = dag2.getEntries(); + assertEquals(entries1.size(), entries2.size()); + for (int i = 0; i < entries1.size(); i++) { + WorkflowDagEntry e1 = entries1.get(i); + WorkflowDagEntry e2 = entries2.get(i); + assertEquals(e1.getSource(), e2.getSource()); + List<String> t1 = e1.getTargets(); + List<String> t2 = e2.getTargets(); + assertEquals(t1.size(), t2.size()); + for (int j = 0; j < t1.size(); j++) { + assertEquals(t1.get(j), t2.get(j)); + } + } + } +} Modified: incubator/ambari/trunk/pom.xml URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/pom.xml?rev=1432294&r1=1432293&r2=1432294&view=diff ============================================================================== --- incubator/ambari/trunk/pom.xml (original) +++ incubator/ambari/trunk/pom.xml Fri Jan 11 21:35:03 2013 @@ -108,6 +108,37 @@ <mappings/> </configuration> </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*.json</exclude> + <exclude>derby.log</exclude> + <exclude>AMBARI-666-CHANGES.txt</exclude> + <exclude>pass.txt</exclude> + <exclude>contrib/addons/test/dataServices/jmx/data/cluster_configuration.json.nohbase</exclude> + + <!--IDE and GIT files--> + <exclude>.idea/</exclude> + <exclude>.git/</exclude> + <exclude>**/.gitignore</exclude> + <exclude>**/.gitattributes</exclude> + + <!--gitignore content--> + <exclude>.DS_Store</exclude> + <exclude>.iml/</exclude> + <exclude>.classpath</exclude> + <exclude>.project</exclude> + <exclude>.settings</exclude> + <exclude>*.pyc</exclude> + <exclude>*.py~</exclude> + <exclude>.hg</exclude> + <exclude>.hgignore</exclude> + <exclude>.hgtags</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> </project>
