Modified: 
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java?rev=1432294&r1=1432293&r2=1432294&view=diff
==============================================================================
--- 
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
 (original)
+++ 
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
 Fri Jan 11 21:35:03 2013
@@ -23,7 +23,12 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -127,7 +132,7 @@ public class MapReduceJobHistoryUpdater 
     
     workflowSelectPS =
         connection.prepareStatement(
-            "SELECT workflowId FROM " + WORKFLOW_TABLE + " where workflowId = 
?"
+            "SELECT workflowContext FROM " + WORKFLOW_TABLE + " where 
workflowId = ?"
             );
 
     workflowPS = 
@@ -154,6 +159,8 @@ public class MapReduceJobHistoryUpdater 
             "UPDATE " +
                 WORKFLOW_TABLE +
                 " SET " +
+                "workflowContext = ?, " +
+                "numJobsTotal = ?, " +
                 "lastUpdateTime = ?, " +
                 "duration = ? - (SELECT startTime FROM " +
                 WORKFLOW_TABLE +
@@ -597,6 +604,57 @@ public class MapReduceJobHistoryUpdater 
     return context;
   }
   
+  public static void mergeEntries(Map<String, Set<String>> edges, 
List<WorkflowDagEntry> entries) {
+    if (entries == null)
+      return;
+    for (WorkflowDagEntry entry : entries) {
+      if (!edges.containsKey(entry.getSource()))
+        edges.put(entry.getSource(), new TreeSet<String>());
+      Set<String> targets = edges.get(entry.getSource());
+      targets.addAll(entry.getTargets());
+    }
+  }
+  
+  public static WorkflowDag constructMergedDag(WorkflowContext 
workflowContext, WorkflowContext existingWorkflowContext) {
+    Map<String, Set<String>> edges = new TreeMap<String, Set<String>>();
+    if (existingWorkflowContext.getWorkflowDag() != null)
+      mergeEntries(edges, 
existingWorkflowContext.getWorkflowDag().getEntries());
+    if (workflowContext.getWorkflowDag() != null)
+      mergeEntries(edges, workflowContext.getWorkflowDag().getEntries());
+    WorkflowDag mergedDag = new WorkflowDag();
+    for (Entry<String,Set<String>> edge : edges.entrySet()) {
+      WorkflowDagEntry entry = new WorkflowDagEntry();
+      entry.setSource(edge.getKey());
+      entry.getTargets().addAll(edge.getValue());
+      mergedDag.addEntry(entry);
+    }
+    return mergedDag;
+  }
+  
+  private static WorkflowContext getSanitizedWorkflow(WorkflowContext 
workflowContext, WorkflowContext existingWorkflowContext) {
+    WorkflowContext sanitizedWC = new WorkflowContext();
+    if (existingWorkflowContext == null) {
+      sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag());
+      
sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext());
+    } else {
+      sanitizedWC.setWorkflowDag(constructMergedDag(existingWorkflowContext, 
workflowContext));
+      
sanitizedWC.setParentWorkflowContext(existingWorkflowContext.getParentWorkflowContext());
+    }
+    return sanitizedWC;
+  }
+  
+  private static String getWorkflowString(WorkflowContext sanitizedWC) {
+    String sanitizedWCString = null;
+    try {
+      ObjectMapper om = new ObjectMapper();
+      sanitizedWCString = om.writeValueAsString(sanitizedWC);
+    } catch (IOException e) {
+      e.printStackTrace();
+      sanitizedWCString = "";
+    }
+    return sanitizedWCString;
+  }
+  
   private void processJobSubmittedEvent(
       PreparedStatement jobPS, 
       PreparedStatement workflowSelectPS, PreparedStatement workflowPS, 
@@ -616,35 +674,35 @@ public class MapReduceJobHistoryUpdater 
       
       // Get workflow information
       boolean insertWorkflow = false;
+      String existingContextString = null;
       
+      ResultSet rs = null;
       try {
         workflowSelectPS.setString(1, workflowContext.getWorkflowId());
         workflowSelectPS.execute();
-        ResultSet rs = workflowSelectPS.getResultSet();
-        insertWorkflow = !rs.next();
+        rs = workflowSelectPS.getResultSet();
+        if (rs.next()) {
+          existingContextString = rs.getString(1);
+        } else {
+          insertWorkflow = true;
+        }
       } catch (SQLException sqle) {
         LOG.warn("workflow select failed with: ", sqle);
         insertWorkflow = false;
+      } finally {
+        try {
+          if (rs != null)
+            rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
       }
 
       // Insert workflow 
       if (insertWorkflow) {
-        WorkflowContext sanitizedWC = new WorkflowContext();
-        sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag());
-        
sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext());
-
-        String sanitizedWCString = null;
-        try {
-          ObjectMapper om = new ObjectMapper();
-          sanitizedWCString = om.writeValueAsString(sanitizedWC);
-        } catch (IOException e) {
-          e.printStackTrace();
-          sanitizedWCString = "";
-        } 
-
         workflowPS.setString(1, workflowContext.getWorkflowId());
         workflowPS.setString(2, workflowContext.getWorkflowName());
-        workflowPS.setString(3, sanitizedWCString);
+        workflowPS.setString(3, 
getWorkflowString(getSanitizedWorkflow(workflowContext, null)));
         workflowPS.setString(4, historyEvent.getUserName());
         workflowPS.setLong(5, historyEvent.getSubmitTime());
         workflowPS.setLong(6, historyEvent.getSubmitTime());
@@ -653,10 +711,22 @@ public class MapReduceJobHistoryUpdater 
         LOG.debug("Successfully inserted workflowId = " + 
             workflowContext.getWorkflowId());
       } else {
-        workflowUpdateTimePS.setLong(1, historyEvent.getSubmitTime());
-        workflowUpdateTimePS.setLong(2, historyEvent.getSubmitTime());
-        workflowUpdateTimePS.setString(3, workflowContext.getWorkflowId());
-        workflowUpdateTimePS.setString(4, workflowContext.getWorkflowId());
+        ObjectMapper om = new ObjectMapper();
+        WorkflowContext existingWorkflowContext = null;
+        try {
+          if (existingContextString != null)
+            existingWorkflowContext = 
om.readValue(existingContextString.getBytes(), WorkflowContext.class);
+        } catch (IOException e) {
+          LOG.warn("Couldn't read existing workflow context for " + 
workflowContext.getWorkflowId(), e);
+        }
+        
+        WorkflowContext sanitizedWC = getSanitizedWorkflow(workflowContext, 
existingWorkflowContext);
+        workflowUpdateTimePS.setString(1, getWorkflowString(sanitizedWC));
+        workflowUpdateTimePS.setLong(2, sanitizedWC.getWorkflowDag().size());
+        workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime());
+        workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime());
+        workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId());
+        workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId());
         workflowUpdateTimePS.executeUpdate();
         LOG.debug("Successfully updated workflowId = " + 
             workflowContext.getWorkflowId());

Modified: 
incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java?rev=1432294&r1=1432293&r2=1432294&view=diff
==============================================================================
--- 
incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java
 (original)
+++ 
incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java
 Fri Jan 11 21:35:03 2013
@@ -30,6 +30,7 @@ import org.apache.ambari.eventdb.model.W
 import 
org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
 import org.apache.hadoop.util.StringUtils;
 
@@ -65,20 +66,42 @@ public class TestJobHistoryParsing exten
     test("id_= 0-1", "something.name", "1=0", adj);
   }
   
+  public void test3() {
+    String s = "`~!@#$%^&*()-_=+[]{}|,.<>/?;:'\"";
+    test(s, s, s, new HashMap<String,String[]>());
+  }
+  
+  public void test4() {
+    Map<String,String[]> adj = new HashMap<String,String[]>();
+    adj.put("X", new String[] {});
+    test("", "jobName", "X", adj);
+  }
+  
   public void test(String workflowId, String workflowName, String 
workflowNodeName, Map<String,String[]> adjacencies) {
     Configuration conf = new Configuration();
     setProperties(conf, workflowId, workflowName, workflowNodeName, 
adjacencies);
     String log = log("JOB", new String[] {ID, NAME, NODE, ADJ},
         new String[] {conf.get(ID_PROP), conf.get(NAME_PROP), 
conf.get(NODE_PROP), JobHistory.JobInfo.getWorkflowAdjacencies(conf)});
     ParsedLine line = new ParsedLine(log);
-    JobSubmittedEvent event = new JobSubmittedEvent(null, "", "", 0l, "", 
null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ));
+    JobID jobid = new JobID("id", 1);
+    JobSubmittedEvent event = new JobSubmittedEvent(jobid, workflowName, "", 
0l, "", null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ));
     WorkflowContext context = 
MapReduceJobHistoryUpdater.buildWorkflowContext(event);
-    assertEquals("Didn't recover workflowId", workflowId, 
context.getWorkflowId());
+    
+    String resultingWorkflowId = workflowId;
+    if (workflowId.isEmpty())
+      resultingWorkflowId = jobid.toString().replace("job_", "mr_");
+    assertEquals("Didn't recover workflowId", resultingWorkflowId, 
context.getWorkflowId());
     assertEquals("Didn't recover workflowName", workflowName, 
context.getWorkflowName());
     assertEquals("Didn't recover workflowNodeName", workflowNodeName, 
context.getWorkflowEntityName());
-    assertEquals("Got incorrect number of adjacencies", adjacencies.size(), 
context.getWorkflowDag().getEntries().size());
+    
+    Map<String,String[]> resultingAdjacencies = adjacencies;
+    if (resultingAdjacencies.size() == 0) {
+      resultingAdjacencies = new HashMap<String,String[]>();
+      resultingAdjacencies.put(workflowNodeName, new String[] {});
+    }
+    assertEquals("Got incorrect number of adjacencies", 
resultingAdjacencies.size(), context.getWorkflowDag().getEntries().size());
     for (WorkflowDagEntry entry : context.getWorkflowDag().getEntries()) {
-      String[] sTargets = adjacencies.get(entry.getSource());
+      String[] sTargets = resultingAdjacencies.get(entry.getSource());
       assertNotNull("No original targets for " + entry.getSource(), sTargets);
       List<String> dTargets = entry.getTargets();
       assertEquals("Got incorrect number of targets for " + entry.getSource(), 
sTargets.length, dTargets.size());

Added: 
incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java?rev=1432294&view=auto
==============================================================================
--- 
incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java
 (added)
+++ 
incubator/ambari/trunk/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestMapReduceJobHistoryUpdater.java
 Fri Jan 11 21:35:03 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari;
+
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag;
+import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
+import 
org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater;
+
+/**
+ * 
+ */
+public class TestMapReduceJobHistoryUpdater extends TestCase {
+  public void testDagMerging() {
+    WorkflowDag dag1 = new WorkflowDag();
+    dag1.addEntry(getEntry("a", "b", "c"));
+    dag1.addEntry(getEntry("b", "d"));
+    WorkflowContext one = new WorkflowContext();
+    one.setWorkflowDag(dag1);
+    
+    WorkflowDag dag2 = new WorkflowDag();
+    dag2.addEntry(getEntry("a", "d"));
+    dag2.addEntry(getEntry("c", "e"));
+    WorkflowContext two = new WorkflowContext();
+    two.setWorkflowDag(dag2);
+    
+    WorkflowDag emptyDag = new WorkflowDag();
+    WorkflowContext three = new WorkflowContext();
+    three.setWorkflowDag(emptyDag);
+    
+    WorkflowDag mergedDag = new WorkflowDag();
+    mergedDag.addEntry(getEntry("a", "b", "c", "d"));
+    mergedDag.addEntry(getEntry("b", "d"));
+    mergedDag.addEntry(getEntry("c", "e"));
+    
+    assertEquals(mergedDag, MapReduceJobHistoryUpdater.constructMergedDag(one, 
two));
+    assertEquals(mergedDag, MapReduceJobHistoryUpdater.constructMergedDag(two, 
one));
+    
+    // test blank dag
+    assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(three, 
one));
+    assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(one, 
three));
+    assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(three, 
two));
+    assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(two, 
three));
+    
+    // test null dag
+    assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(new 
WorkflowContext(), one));
+    assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(one, new 
WorkflowContext()));
+    assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(new 
WorkflowContext(), two));
+    assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(two, new 
WorkflowContext()));
+    
+    // test same dag
+    assertEquals(dag1, MapReduceJobHistoryUpdater.constructMergedDag(one, 
one));
+    assertEquals(dag2, MapReduceJobHistoryUpdater.constructMergedDag(two, 
two));
+    assertEquals(emptyDag, 
MapReduceJobHistoryUpdater.constructMergedDag(three, three));
+  }
+  
+  private static WorkflowDagEntry getEntry(String source, String... targets) {
+    WorkflowDagEntry entry = new WorkflowDagEntry();
+    entry.setSource(source);
+    for (String target : targets) {
+      entry.addTarget(target);
+    }
+    return entry;
+  }
+  
+  private static void assertEquals(WorkflowDag dag1, WorkflowDag dag2) {
+    assertEquals(dag1.size(), dag2.size());
+    List<WorkflowDagEntry> entries1 = dag1.getEntries();
+    List<WorkflowDagEntry> entries2 = dag2.getEntries();
+    assertEquals(entries1.size(), entries2.size());
+    for (int i = 0; i < entries1.size(); i++) {
+      WorkflowDagEntry e1 = entries1.get(i);
+      WorkflowDagEntry e2 = entries2.get(i);
+      assertEquals(e1.getSource(), e2.getSource());
+      List<String> t1 = e1.getTargets();
+      List<String> t2 = e2.getTargets();
+      assertEquals(t1.size(), t2.size());
+      for (int j = 0; j < t1.size(); j++) {
+        assertEquals(t1.get(j), t2.get(j));
+      }
+    }
+  }
+}

Modified: incubator/ambari/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/pom.xml?rev=1432294&r1=1432293&r2=1432294&view=diff
==============================================================================
--- incubator/ambari/trunk/pom.xml (original)
+++ incubator/ambari/trunk/pom.xml Fri Jan 11 21:35:03 2013
@@ -108,6 +108,37 @@
           <mappings/>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>**/*.json</exclude>
+            <exclude>derby.log</exclude>
+            <exclude>AMBARI-666-CHANGES.txt</exclude>
+            <exclude>pass.txt</exclude>
+            
<exclude>contrib/addons/test/dataServices/jmx/data/cluster_configuration.json.nohbase</exclude>
+
+            <!--IDE and GIT files-->
+            <exclude>.idea/</exclude>
+            <exclude>.git/</exclude>
+            <exclude>**/.gitignore</exclude>
+            <exclude>**/.gitattributes</exclude>
+
+            <!--gitignore content-->
+            <exclude>.DS_Store</exclude>
+            <exclude>.iml/</exclude>
+            <exclude>.classpath</exclude>
+            <exclude>.project</exclude>
+            <exclude>.settings</exclude>
+            <exclude>*.pyc</exclude>
+            <exclude>*.py~</exclude>
+            <exclude>.hg</exclude>
+            <exclude>.hgignore</exclude>
+            <exclude>.hgtags</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>


Reply via email to