Author: mahadev
Date: Fri Jan 11 05:20:30 2013
New Revision: 1431892

URL: http://svn.apache.org/viewvc?rev=1431892&view=rev
Log:
AMBARI-1114. BootStrap fails but the api says thats its done and exit status is 
0. (Nate Cole via mahadev)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
    
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
    
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
    
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
    
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
    
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
    
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
    
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
    
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
    incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py
    
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
    
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
    
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Jan 11 05:20:30 2013
@@ -16,6 +16,9 @@ should be listed by their full name.
  returns 200 (no op), even though GET with the same predicate returns a number
  of host_components. (Tom Beerbower via mahadev)
 
+ AMBARI-1114. BootStrap fails but the api says thats its done and exit status
+ is 0. (Nate Cole via mahadev)
+
  BUG FIXES
 
 

Modified: 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
 (original)
+++ 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
 Fri Jan 11 05:20:30 2013
@@ -38,6 +38,17 @@ class hdp-mysql::server(
       require   => Anchor['hdp-mysql::server::begin']
     }
 
+
+    if ($hdp::params::hdp_os_type == "suse") {
+      # On Suse, creating symlink from default mysqld pid file to expected 
/var/run location
+      file { '/var/run/mysqld.pid':
+         ensure => 'link',
+         target => '/var/lib/mysql/mysqld.pid',
+         require => Hdp::Package['mysql'],
+      }
+    }
+
+
     if hdp_is_empty($hdp::params::services_names[mysql]) {
       hdp_fail("There is no service name for service mysql")
     }
@@ -58,28 +69,52 @@ class hdp-mysql::server(
       $service_name = $service_name_by_os[$hdp::params::hdp_os_type]
     }
 
-    service {$service_name:
-      ensure => running,
-      require => Hdp::Package['mysql'],
-      notify  => File['/tmp/startMysql.sh']
-    }
-
-    file { '/tmp/startMysql.sh':
-      ensure => present,
-      source => "puppet:///modules/hdp-mysql/startMysql.sh",
-      mode => '0755',
-      require => service[$service_name],
-      notify => Exec['/tmp/startMysql.sh']
-    }
-
-    exec { '/tmp/startMysql.sh':
-      command   => "sh /tmp/startMysql.sh ${db_user} ${db_pw} ${host}",
-      tries     => 3,
-      try_sleep => 5,
-      require   => File['/tmp/startMysql.sh'],
-      path      => '/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
-      notify   => Anchor['hdp-mysql::server::end'],
-      logoutput => "true"
+    $mysqld_state = $service_state ? {
+     'running' => 'running',
+      default =>  'stopped',
+    }
+
+    if ($hdp::params::hdp_os_type == "suse") {
+      service {$service_name:
+        ensure => $mysqld_state,
+        require => File['/var/run/mysqld.pid'],
+        notify  => File['/tmp/addMysqlUser.sh']
+      }
+    } else {
+        service {$service_name:
+        ensure => $mysqld_state,
+        require => Hdp::Package['mysql'],
+        notify  => File['/tmp/addMysqlUser.sh']
+      }
+    }
+
+
+    if ($service_state == 'installed_and_configured') {
+
+      file {'/tmp/addMysqlUser.sh':
+        ensure => present,
+        source => "puppet:///modules/hdp-mysql/addMysqlUser.sh",
+        mode => '0755',
+        require => Service[$service_name],
+        notify => Exec['/tmp/addMysqlUser.sh'],
+      }
+      # We start the DB and add a user
+      exec { '/tmp/addMysqlUser.sh':
+        command   => "sh /tmp/addMysqlUser.sh ${service_name} ${db_user} 
${db_pw} ${host}",
+        tries     => 3,
+        try_sleep => 5,
+        require   => File['/tmp/addMysqlUser.sh'],
+        path      => '/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
+        notify   => Anchor['hdp-mysql::server::end'],
+        logoutput => "true",
+      }
+    } else {
+      # Now MySQL is running so we remove the temporary file
+      file {'/tmp/addMysqlUser.sh':
+        ensure => absent,
+        require => Service[$service_name],
+        notify => Anchor['hdp-mysql::server::end'],
+      }
     }
 
     anchor { 'hdp-mysql::server::end':}

Modified: 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
 (original)
+++ 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
 Fri Jan 11 05:20:30 2013
@@ -15,8 +15,8 @@ define servicegroup {
   alias  OOZIE Checks
 }
 define servicegroup {
-  servicegroup_name  TEMPLETON
-  alias  TEMPLETON Checks
+  servicegroup_name  WEBHCAT
+  alias  WEBHCAT Checks
 }
 define servicegroup {
   servicegroup_name  NAGIOS

Modified: 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
 (original)
+++ 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
 Fri Jan 11 05:20:30 2013
@@ -449,13 +449,13 @@ define service {
         max_check_attempts      3
 }
 <%end-%>
-<%if scope.function_hdp_nagios_members_exist('templeton-server')-%>
-# Templeton check
+<%if scope.function_hdp_nagios_members_exist('webhcat-server')-%>
+# WEBHCAT check
 define service {
-        hostgroup_name          templeton-server
+        hostgroup_name          webhcat-server
         use                     hadoop-service
-        service_description     TEMPLETON::Templeton status check
-        servicegroups           TEMPLETON
+        service_description     WEBHCAT::WEBHCAT status check
+        servicegroups           WEBHCAT 
         <%if scope.function_hdp_template_var("security_enabled")-%>
         check_command           
check_templeton_status!50111!v1!true!<%=scope.function_hdp_template_var("keytab_path")%>/<%=scope.function_hdp_template_var("nagios_user")%>.headless.keytab!<%=scope.function_hdp_template_var("nagios_user")%>
         <%else-%>

Modified: 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
 (original)
+++ 
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
 Fri Jan 11 05:20:30 2013
@@ -40,11 +40,24 @@ class hdp-oozie::service(
     $lzo_jar_suffix = ""
   }
 
+
   if ($ensure == 'running') {
-    $daemon_cmd = "/bin/sh -c 'cd /usr/lib/oozie && tar -xvf 
oozie-sharelib.tar.gz && mkdir -p ${oozie_tmp} && chown ${user}:hadoop 
${oozie_tmp} && cd ${oozie_tmp}' && su - ${user} -c 
'/usr/lib/oozie/bin/oozie-setup.sh -hadoop 0.20.200 $jar_location -extjs 
$ext_js_path $lzo_jar_suffix && /usr/lib/oozie/bin/ooziedb.sh create -sqlfile 
oozie.sql -run ; hadoop dfs -put /usr/lib/oozie/share share ; hadoop dfs -chmod 
-R 755 /user/${user}/share && /usr/lib/oozie/bin/oozie-start.sh' "
+
+    $cmd1 = "cd /usr/lib/oozie && tar -xvf oozie-sharelib.tar.gz"
+    $cmd2 =  "cd /usr/lib/oozie && mkdir -p ${oozie_tmp}"
+    $cmd3 =  "cd /usr/lib/oozie && chown ${user}:hadoop ${oozie_tmp}"
+    
+    $cmd4 =  "cd ${oozie_tmp} && /usr/lib/oozie/bin/oozie-setup.sh -hadoop 
0.20.200 $jar_location -extjs $ext_js_path $lzo_jar_suffix"
+    $cmd5 =  "cd ${oozie_tmp} && /usr/lib/oozie/bin/ooziedb.sh create -sqlfile 
oozie.sql -run ; hadoop dfs -put /usr/lib/oozie/share share ; hadoop dfs -chmod 
-R 755 /user/${user}/share"
+    $cmd6 =  "/usr/lib/oozie/bin/oozie-start.sh"
+
+
+    $sh_cmds = [$cmd1, $cmd2, $cmd3]
+    $user_cmds = [$cmd4, $cmd5, $cmd6]
+       
     $no_op_test = "ls ${pid_file} >/dev/null 2>&1 && ps `cat ${pid_file}` 
>/dev/null 2>&1"
   } elsif ($ensure == 'stopped') {
-    $daemon_cmd  = "su - ${user} -c  'cd ${oozie_tmp} && 
/usr/lib/oozie/bin/oozie-stop.sh'"
+    $stop_cmd  = "su - ${user} -c  'cd ${oozie_tmp} && 
/usr/lib/oozie/bin/oozie-stop.sh'"
     $no_op_test = undef
   } else {
     $daemon_cmd = undef
@@ -58,13 +71,16 @@ class hdp-oozie::service(
 
   anchor{'hdp-oozie::service::begin':} -> Hdp-oozie::Service::Directory<||> -> 
anchor{'hdp-oozie::service::end':}
   
-  if ($daemon_cmd != undef) {
-    hdp::exec { $daemon_cmd:
-      command => $daemon_cmd,
+  if ($ensure == 'running') {
+    hdp-oozie::service::exec_sh{$sh_cmds:}
+    hdp-oozie::service::exec_user{$user_cmds:}
+    Hdp-oozie::Service::Directory<||> -> Hdp-oozie::Service::Exec_sh[$cmd1] -> 
Hdp-oozie::Service::Exec_sh[$cmd2] ->Hdp-oozie::Service::Exec_sh[$cmd3] -> 
Hdp-oozie::Service::Exec_user[$cmd4] ->Hdp-oozie::Service::Exec_user[$cmd5] -> 
Hdp-oozie::Service::Exec_user[$cmd6] -> Anchor['hdp-oozie::service::end']
+  } elsif ($ensure == 'stopped') {
+    hdp::exec { "exec $stop_cmd":
+      command => $stop_cmd,
       unless  => $no_op_test,
       initial_wait => $initial_wait
-    }
-    Hdp-oozie::Service::Directory<||> -> Hdp::Exec[$daemon_cmd] -> 
Anchor['hdp-oozie::service::end']
+  }
   }
 }
 
@@ -84,3 +100,23 @@ define hdp-oozie::service::createsymlink
     unless => "test -e /usr/lib/oozie/oozie-server/lib/mapred-site.xml"
   }
 }
+
+define hdp-oozie::service::exec_sh()
+{
+  $no_op_test = "ls ${pid_file} >/dev/null 2>&1 && ps `cat ${pid_file}` 
>/dev/null 2>&1"
+  hdp::exec { "exec $name":
+    command => "/bin/sh -c '$name'",
+    unless  => $no_op_test,
+    initial_wait => $initial_wait
+  }
+}
+
+define hdp-oozie::service::exec_user()
+{
+  $no_op_test = "ls ${pid_file} >/dev/null 2>&1 && ps `cat ${pid_file}` 
>/dev/null 2>&1"
+  hdp::exec { "exec $name":
+    command => "su - ${user} -c '$name'",
+    unless  => $no_op_test,
+    initial_wait => $initial_wait
+  }
+}

Modified: 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
 Fri Jan 11 05:20:30 2013
@@ -19,17 +19,25 @@ package org.apache.ambari.eventdb.db;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.ambari.eventdb.model.DataTable;
 import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
 import org.apache.ambari.eventdb.model.TaskAttempt;
 import org.apache.ambari.eventdb.model.WorkflowContext;
-import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
+import org.apache.ambari.eventdb.model.Workflows;
+import 
org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
 
 public interface DBConnector {
   public void submitJob(JobDBEntry j, WorkflowContext context) throws 
IOException;
   
   public void updateJob(JobDBEntry j) throws IOException;
   
-  public List<WorkflowDBEntry> fetchWorkflows() throws IOException;
+  public Workflows fetchWorkflows() throws IOException;
+  
+  public Workflows fetchWorkflows(WorkflowFields field, boolean sortAscending, 
int offset, int limit) throws IOException;
+  
+  public DataTable fetchWorkflows(int offset, int limit, String searchTerm, 
int echo, WorkflowFields field, boolean sortAscending, String searchWorkflowId,
+      String searchWorkflowName, String searchWorkflowType, String 
searchUserName, int minJobs, int maxJobs, long minInputBytes, long 
maxInputBytes,
+      long minOutputBytes, long maxOutputBytes, long minDuration, long 
maxDuration, long minStartTime, long maxStartTime) throws IOException;
   
   public List<JobDBEntry> fetchJobDetails(String workflowID) throws 
IOException;
   

Modified: 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
 Fri Jan 11 05:20:30 2013
@@ -16,32 +16,44 @@
  */
 package org.apache.ambari.eventdb.db;
 
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.eventdb.model.DataTable;
+import org.apache.ambari.eventdb.model.DataTable.AvgData;
+import org.apache.ambari.eventdb.model.DataTable.Summary;
+import org.apache.ambari.eventdb.model.DataTable.Summary.SummaryFields;
+import org.apache.ambari.eventdb.model.DataTable.Times;
 import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
 import org.apache.ambari.eventdb.model.Jobs.JobDBEntry.JobFields;
 import org.apache.ambari.eventdb.model.TaskAttempt;
 import org.apache.ambari.eventdb.model.TaskAttempt.TaskAttemptFields;
 import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.Workflows;
 import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
 import 
org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import java.io.IOException;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-
 public class PostgresConnector implements DBConnector {
   private static Log LOG = LogFactory.getLog(PostgresConnector.class);
   private static final String WORKFLOW_TABLE_NAME = "workflow";
   private static final String JOB_TABLE_NAME = "job";
   private static final String TASK_ATTEMPT_TABLE_NAME = "taskattempt";
-  private static final String AGGREGATE_INPUTBYTES = "inputBytes";
-  private static final String AGGREGATE_OUTPUTBYTES = "outputBytes";
+  public static final String SORT_ASC = "ASC";
+  public static final String SORT_DESC = "DESC";
   
   private static final ObjectMapper jsonMapper = new ObjectMapper();
   
@@ -54,9 +66,15 @@ public class PostgresConnector implement
     UJ_UPDATE_JOB_PS(""),
     UJ_UPDATE_WORKFLOW_PS(""),
     FW_PS("SELECT " + WorkflowDBEntry.WORKFLOW_FIELDS + " FROM " + 
WORKFLOW_TABLE_NAME),
+    FW_COUNT_PS("SELECT count(*) as " + SummaryFields.numRows + " FROM " + 
WORKFLOW_TABLE_NAME),
+    FW_SUMMARY_PS("SELECT count(*) as " + SummaryFields.numRows + ", "
+        + getAvg(WorkflowFields.NUMJOBSTOTAL, SummaryFields.avgJobs, 
SummaryFields.minJobs, SummaryFields.maxJobs) + ", "
+        + getAvg(WorkflowFields.INPUTBYTES, SummaryFields.avgInput, 
SummaryFields.minInput, SummaryFields.maxInput) + ", "
+        + getAvg(WorkflowFields.OUTPUTBYTES, SummaryFields.avgOutput, 
SummaryFields.minOutput, SummaryFields.maxOutput) + ", "
+        + getAvg(WorkflowFields.DURATION, SummaryFields.avgDuration, 
SummaryFields.minDuration, SummaryFields.maxDuration) + ", min("
+        + WorkflowFields.STARTTIME + ") as " + SummaryFields.youngest + ", 
max(" + WorkflowFields.STARTTIME + ") as " + SummaryFields.oldest + " FROM "
+        + WORKFLOW_TABLE_NAME),
     FJD_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " 
WHERE " + JobFields.WORKFLOWID.toString() + " = ?"),
-    WORKFLOW_AGGREGATE_IO("SELECT SUM(" + JobFields.INPUTBYTES + ") as " + 
AGGREGATE_INPUTBYTES + ", SUM(" + JobFields.OUTPUTBYTES + ") as "
-        + AGGREGATE_OUTPUTBYTES + " FROM " + JOB_TABLE_NAME + " WHERE " + 
JobFields.WORKFLOWID.toString() + " = ?"),
     FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " 
FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
     FTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + 
TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? AND "
         + TaskAttemptFields.TASKTYPE + " = ? ORDER BY " + 
TaskAttemptFields.STARTTIME);
@@ -70,6 +88,10 @@ public class PostgresConnector implement
     public String getStatementString() {
       return statementString;
     }
+    
+    private static String getAvg(WorkflowFields field, SummaryFields avg, 
SummaryFields min, SummaryFields max) {
+      return "avg(" + field + ") as " + avg + ", min(" + field + ") as " + min 
+ ", max(" + field + ") as " + max;
+    }
   }
   
   private Map<Statements,PreparedStatement> preparedStatements = new 
EnumMap<Statements,PreparedStatement>(Statements.class);
@@ -90,74 +112,150 @@ public class PostgresConnector implement
   
   @Override
   public void submitJob(JobDBEntry j, WorkflowContext context) throws 
IOException {
-//    PreparedStatement insertJobPS = getPS(Statements.SJ_INSERT_JOB_PS);
-//    PreparedStatement checkWorkflowPS = 
getPS(Statements.SJ_CHECK_WORKFLOW_PS);
-//    PreparedStatement insertWorkflowPS = 
getPS(Statements.SJ_INSERT_WORKFLOW_PS);
+    // PreparedStatement insertJobPS = getPS(Statements.SJ_INSERT_JOB_PS);
+    // PreparedStatement checkWorkflowPS = 
getPS(Statements.SJ_CHECK_WORKFLOW_PS);
+    // PreparedStatement insertWorkflowPS = 
getPS(Statements.SJ_INSERT_WORKFLOW_PS);
     throw new NotImplementedException();
   }
   
   @Override
   public void updateJob(JobDBEntry j) throws IOException {
-//    PreparedStatement updateJobPS = getPS(Statements.UJ_UPDATE_JOB_PS);
-//    PreparedStatement updateWorkflowPS = 
getPS(Statements.UJ_UPDATE_WORKFLOW_PS);
+    // PreparedStatement updateJobPS = getPS(Statements.UJ_UPDATE_JOB_PS);
+    // PreparedStatement updateWorkflowPS = 
getPS(Statements.UJ_UPDATE_WORKFLOW_PS);
     throw new NotImplementedException();
   }
   
   @Override
-  public List<WorkflowDBEntry> fetchWorkflows() throws IOException {
-    PreparedStatement ps = getPS(Statements.FW_PS);
+  public Workflows fetchWorkflows() throws IOException {
+    Workflows workflows = new Workflows();
+    workflows.setWorkflows(fetchWorkflows(getPS(Statements.FW_PS)));
+    workflows.setSummary(fetchSummary(getPS(Statements.FW_SUMMARY_PS)));
+    return workflows;
+  }
+  
+  @Override
+  public Workflows fetchWorkflows(WorkflowFields field, boolean sortAscending, 
int offset, int limit) throws IOException {
+    if (offset < 0)
+      offset = 0;
+    Workflows workflows = new Workflows();
+    workflows.setWorkflows(fetchWorkflows(getQualifiedPS(Statements.FW_PS, "", 
field, sortAscending, offset, limit)));
+    workflows.setSummary(fetchSummary(getPS(Statements.FW_SUMMARY_PS)));
+    return workflows;
+  }
+  
+  private List<WorkflowDBEntry> fetchWorkflows(PreparedStatement ps) throws 
IOException {
     List<WorkflowDBEntry> workflows = new ArrayList<WorkflowDBEntry>();
     ResultSet rs = null;
     try {
       rs = ps.executeQuery();
       while (rs.next()) {
-        WorkflowDBEntry w = new WorkflowDBEntry();
-        w.setWorkflowId(WorkflowFields.WORKFLOWID.getString(rs));
-        w.setWorkflowName(WorkflowFields.WORKFLOWNAME.getString(rs));
-        w.setUserName(WorkflowFields.USERNAME.getString(rs));
-        w.setStartTime(WorkflowFields.STARTTIME.getLong(rs));
-        long updateTime = WorkflowFields.LASTUPDATETIME.getLong(rs);
-        if (updateTime > w.getStartTime())
-          w.setElapsedTime(updateTime - w.getStartTime());
-        else
-          w.setElapsedTime(0);
-        w.setNumJobsTotal(WorkflowFields.NUMJOBSTOTAL.getInt(rs));
-        w.setNumJobsCompleted(WorkflowFields.NUMJOBSCOMPLETED.getInt(rs));
-        
w.setWorkflowContext(jsonMapper.readValue(WorkflowFields.WORKFLOWCONTEXT.getString(rs),
 WorkflowContext.class));
-        // For each work flow get the aggregate i/o counts from the job table. 
TODO: clean up the data structures
-        //
-        try {
-          PreparedStatement aggregateio = 
getPS(Statements.WORKFLOW_AGGREGATE_IO);
-          aggregateio.setString(1, w.getWorkflowId());
-          ResultSet ioresult = aggregateio.executeQuery();
-          if (ioresult.next()) {
-            w.setInputBytes(ioresult.getLong(AGGREGATE_INPUTBYTES));
-            w.setOutputBytes(ioresult.getLong(AGGREGATE_OUTPUTBYTES));
-          }
-        } catch (SQLException e) {
-          throw new IOException(e);
-        }
-        workflows.add(w);
+        workflows.add(getWorkflowDBEntry(rs));
       }
-
     } catch (SQLException e) {
       throw new IOException(e);
-    }finally {
-        try{
-        if (rs!=null)
-            rs.close();
-        } catch (SQLException e) {
-            LOG.error("Exception while closing ResultSet",e);
-        }
+    } finally {
+      try {
+        if (rs != null)
+          rs.close();
+      } catch (SQLException e) {
+        LOG.error("Exception while closing ResultSet", e);
+      }
     }
     return workflows;
   }
   
+  private Summary fetchSummary(PreparedStatement ps) throws IOException {
+    Summary summary = new Summary();
+    ResultSet rs = null;
+    try {
+      rs = ps.executeQuery();
+      if (rs.next()) {
+        summary.setNumRows(SummaryFields.numRows.getInt(rs));
+        summary.setJobs(getAvgData(rs, SummaryFields.avgJobs, 
SummaryFields.minJobs, SummaryFields.maxJobs));
+        summary.setInput(getAvgData(rs, SummaryFields.avgInput, 
SummaryFields.minInput, SummaryFields.maxInput));
+        summary.setOutput(getAvgData(rs, SummaryFields.avgOutput, 
SummaryFields.minOutput, SummaryFields.maxOutput));
+        summary.setDuration(getAvgData(rs, SummaryFields.avgDuration, 
SummaryFields.minDuration, SummaryFields.maxDuration));
+        Times times = new Times();
+        times.setYoungest(SummaryFields.youngest.getLong(rs));
+        times.setOldest(SummaryFields.oldest.getLong(rs));
+        summary.setTimes(times);
+      }
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      try {
+        if (rs != null)
+          rs.close();
+      } catch (SQLException e) {
+        LOG.error("Exception while closing ResultSet", e);
+      }
+    }
+    return summary;
+  }
+  
+  private static WorkflowDBEntry getWorkflowDBEntry(ResultSet rs) throws 
SQLException, JsonParseException, JsonMappingException, IOException {
+    WorkflowDBEntry w = new WorkflowDBEntry();
+    w.setWorkflowId(WorkflowFields.WORKFLOWID.getString(rs));
+    w.setWorkflowName(WorkflowFields.WORKFLOWNAME.getString(rs));
+    w.setUserName(WorkflowFields.USERNAME.getString(rs));
+    w.setStartTime(WorkflowFields.STARTTIME.getLong(rs));
+    w.setElapsedTime(WorkflowFields.DURATION.getLong(rs));
+    w.setNumJobsTotal(WorkflowFields.NUMJOBSTOTAL.getInt(rs));
+    w.setInputBytes(WorkflowFields.INPUTBYTES.getLong(rs));
+    w.setOutputBytes(WorkflowFields.OUTPUTBYTES.getLong(rs));
+    w.setNumJobsCompleted(WorkflowFields.NUMJOBSCOMPLETED.getInt(rs));
+    
w.setWorkflowContext(jsonMapper.readValue(WorkflowFields.WORKFLOWCONTEXT.getString(rs),
 WorkflowContext.class));
+    return w;
+  }
+  
+  private static AvgData getAvgData(ResultSet rs, SummaryFields avg, 
SummaryFields min, SummaryFields max) throws SQLException {
+    AvgData avgData = new AvgData();
+    avgData.setAvg(avg.getDouble(rs));
+    avgData.setMin(min.getLong(rs));
+    avgData.setMax(max.getLong(rs));
+    return avgData;
+  }
+  
+  @Override
+  public DataTable fetchWorkflows(int offset, int limit, String searchTerm, 
int echo, WorkflowFields col, boolean sortAscending, String searchWorkflowId,
+      String searchWorkflowName, String searchWorkflowType, String 
searchUserName, int minJobs, int maxJobs, long minInputBytes, long 
maxInputBytes,
+      long minOutputBytes, long maxOutputBytes, long minDuration, long 
maxDuration, long minStartTime, long maxStartTime) throws IOException {
+    int total = 0;
+    PreparedStatement ps = getPS(Statements.FW_COUNT_PS);
+    ResultSet rs = null;
+    try {
+      rs = ps.executeQuery();
+      if (rs.next())
+        total = SummaryFields.numRows.getInt(rs);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      try {
+        if (rs != null)
+          rs.close();
+      } catch (SQLException e) {
+        LOG.error("Exception while closing ResultSet", e);
+      }
+    }
+    
+    String searchClause = buildSearchClause(searchTerm, searchWorkflowId, 
searchWorkflowName, searchWorkflowType, searchUserName, minJobs, maxJobs,
+        minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, 
minDuration, maxDuration, minStartTime, maxStartTime);
+    List<WorkflowDBEntry> workflows = 
fetchWorkflows(getQualifiedPS(Statements.FW_PS, searchClause, col, 
sortAscending, offset, limit));
+    Summary summary = fetchSummary(getQualifiedPS(Statements.FW_SUMMARY_PS, 
searchClause));
+    DataTable table = new DataTable();
+    table.setiTotalRecords(total);
+    table.setiTotalDisplayRecords(summary.getNumRows());
+    table.setAaData(workflows);
+    table.setsEcho(echo);
+    table.setSummary(summary);
+    return table;
+  }
+  
   @Override
   public List<JobDBEntry> fetchJobDetails(String workflowId) throws 
IOException {
     PreparedStatement ps = getPS(Statements.FJD_PS);
     List<JobDBEntry> jobs = new ArrayList<JobDBEntry>();
-      ResultSet rs = null;
+    ResultSet rs = null;
     try {
       ps.setString(1, workflowId);
       rs = ps.executeQuery();
@@ -185,13 +283,14 @@ public class PostgresConnector implement
       rs.close();
     } catch (SQLException e) {
       throw new IOException(e);
-    }finally {
-        if(rs!=null) try {
-            rs.close();
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
         } catch (SQLException e) {
-            LOG.error("Exception while closing ResultSet",e);
+          LOG.error("Exception while closing ResultSet", e);
         }
-
+      
     }
     return jobs;
   }
@@ -200,7 +299,7 @@ public class PostgresConnector implement
   public long[] fetchJobStartStopTimes(String jobID) throws IOException {
     PreparedStatement ps = getPS(Statements.FJSS_PS);
     long[] times = new long[2];
-      ResultSet rs = null;
+    ResultSet rs = null;
     try {
       ps.setString(1, jobID);
       rs = ps.executeQuery();
@@ -211,11 +310,12 @@ public class PostgresConnector implement
       rs.close();
     } catch (SQLException e) {
       throw new IOException(e);
-    }finally {
-        if (rs!=null) try {
-            rs.close();
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
         } catch (SQLException e) {
-            LOG.error("Exception while closing ResultSet",e);
+          LOG.error("Exception while closing ResultSet", e);
         }
     }
     if (times[1] == 0)
@@ -252,11 +352,12 @@ public class PostgresConnector implement
       rs.close();
     } catch (SQLException e) {
       throw new IOException(e);
-    }finally {
-        if (rs!=null) try {
-            rs.close();
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
         } catch (SQLException e) {
-            LOG.error("Exception while closing ResultSet",e);
+          LOG.error("Exception while closing ResultSet", e);
         }
     }
     return taskAttempts;
@@ -279,13 +380,109 @@ public class PostgresConnector implement
     return preparedStatements.get(statement);
   }
   
+  private PreparedStatement getQualifiedPS(Statements statement, String 
searchClause) throws IOException {
+    if (db == null)
+      throw new IOException("postgres db not initialized");
+    try {
+      // LOG.debug("preparing " + statement.getStatementString() + 
searchClause);
+      return db.prepareStatement(statement.getStatementString() + 
searchClause);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  private PreparedStatement getQualifiedPS(Statements statement, String 
searchClause, WorkflowFields field, boolean sortAscending, int offset, int 
limit)
+      throws IOException {
+    if (db == null)
+      throw new IOException("postgres db not initialized");
+    String limitClause = " ORDER BY " + field.toString() + " " + 
(sortAscending ? SORT_ASC : SORT_DESC) + " NULLS " + (sortAscending ? "FIRST " 
: "LAST ")
+        + "OFFSET " + offset + (limit >= 0 ? " LIMIT " + limit : "");
+    return getQualifiedPS(statement, searchClause + limitClause);
+  }
+  
+  private static void addRangeSearch(StringBuilder sb, WorkflowFields field, 
int min, int max) {
+    if (min >= 0)
+      append(sb, greaterThan(field, Integer.toString(min)));
+    if (max >= 0)
+      append(sb, lessThan(field, Integer.toString(max)));
+  }
+  
+  private static void addRangeSearch(StringBuilder sb, WorkflowFields field, 
long min, long max) {
+    if (min >= 0)
+      append(sb, greaterThan(field, Long.toString(min)));
+    if (max >= 0)
+      append(sb, lessThan(field, Long.toString(max)));
+  }
+  
+  private static void append(StringBuilder sb, String s) {
+    if (sb.length() > WHERE.length())
+      sb.append(" and ");
+    sb.append(s);
+  }
+  
+  private static String like(WorkflowFields field, String s) {
+    return field.toString() + " like '%" + s + "%'";
+  }
+  
+  private static String startsWith(WorkflowFields field, String s) {
+    return field.toString() + " like '" + s + "%'";
+  }
+  
+  private static String equals(WorkflowFields field, String s) {
+    return field.toString() + " = '" + s + "'";
+  }
+  
+  private static String lessThan(WorkflowFields field, String s) {
+    return field.toString() + " <= " + s;
+  }
+  
+  private static String greaterThan(WorkflowFields field, String s) {
+    return field.toString() + " >= " + s;
+  }
+  
+  private static final String WHERE = " where";
+  
+  private static String buildSearchClause(String searchTerm, String 
searchWorkflowId, String searchWorkflowName, String searchWorkflowType,
+      String searchUserName, int minJobs, int maxJobs, long minInputBytes, 
long maxInputBytes, long minOutputBytes, long maxOutputBytes, long minDuration,
+      long maxDuration, long minStartTime, long maxStartTime) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(WHERE);
+    if (searchTerm != null && searchTerm.length() > 0) {
+      sb.append(" (");
+      sb.append(like(WorkflowFields.WORKFLOWID, searchTerm));
+      sb.append(" or ");
+      sb.append(like(WorkflowFields.WORKFLOWNAME, searchTerm));
+      sb.append(" or ");
+      sb.append(like(WorkflowFields.USERNAME, searchTerm));
+      sb.append(")");
+    }
+    if (searchWorkflowId != null)
+      append(sb, like(WorkflowFields.WORKFLOWID, searchWorkflowId));
+    if (searchWorkflowName != null)
+      append(sb, like(WorkflowFields.WORKFLOWNAME, searchWorkflowName));
+    if (searchWorkflowType != null)
+      append(sb, startsWith(WorkflowFields.WORKFLOWID, searchWorkflowType));
+    if (searchUserName != null)
+      append(sb, equals(WorkflowFields.USERNAME, searchUserName));
+    addRangeSearch(sb, WorkflowFields.NUMJOBSTOTAL, minJobs, maxJobs);
+    addRangeSearch(sb, WorkflowFields.INPUTBYTES, minInputBytes, 
maxInputBytes);
+    addRangeSearch(sb, WorkflowFields.OUTPUTBYTES, minOutputBytes, 
maxOutputBytes);
+    addRangeSearch(sb, WorkflowFields.DURATION, minDuration, maxDuration);
+    addRangeSearch(sb, WorkflowFields.STARTTIME, minStartTime, maxStartTime);
+    
+    if (sb.length() == WHERE.length())
+      return "";
+    else
+      return sb.toString();
+  }
+  
   @Override
   public void close() {
     if (db != null) {
       try {
         db.close();
       } catch (SQLException e) {
-          LOG.error("Exception while closing connector",e);
+        LOG.error("Exception while closing connector", e);
       }
       db = null;
     }

Modified: 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
 Fri Jan 11 05:20:30 2013
@@ -16,20 +16,23 @@
  */
 package org.apache.ambari.eventdb.model;
 
-import org.apache.commons.lang.StringUtils;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
+
+import org.apache.ambari.eventdb.model.DataTable.Summary;
+import org.apache.commons.lang.StringUtils;
 
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
 public class Workflows {
   List<WorkflowDBEntry> workflows;
+  Summary summary;
   
   public static class WorkflowDBEntry {
     public static enum WorkflowFields {
@@ -38,8 +41,11 @@ public class Workflows {
       USERNAME,
       STARTTIME,
       LASTUPDATETIME,
+      DURATION,
       NUMJOBSTOTAL,
       NUMJOBSCOMPLETED,
+      INPUTBYTES,
+      OUTPUTBYTES,
       PARENTWORKFLOWID,
       WORKFLOWCONTEXT;
       
@@ -153,19 +159,19 @@ public class Workflows {
     public void setWorkflowContext(WorkflowContext workflowContext) {
       this.workflowContext = workflowContext;
     }
-
+    
     public long getInputBytes() {
       return inputBytes;
     }
-
+    
     public void setInputBytes(long inputBytes) {
       this.inputBytes = inputBytes;
     }
-
+    
     public long getOutputBytes() {
       return outputBytes;
     }
-
+    
     public void setOutputBytes(long outputBytes) {
       this.outputBytes = outputBytes;
     }
@@ -180,4 +186,12 @@ public class Workflows {
   public void setWorkflows(List<WorkflowDBEntry> workflows) {
     this.workflows = workflows;
   }
+  
+  public Summary getSummary() {
+    return summary;
+  }
+  
+  public void setSummary(Summary summary) {
+    this.summary = summary;
+  }
 }

Modified: 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
 Fri Jan 11 05:20:30 2013
@@ -17,62 +17,23 @@
 
 package org.apache.ambari.eventdb.webservice;
 
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.api.json.JSONJAXBContext;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.ContextResolver;
-import javax.xml.bind.JAXBContext;
 
-import org.apache.ambari.eventdb.model.TaskLocalityData;
-import org.apache.ambari.eventdb.model.Jobs;
-import org.apache.ambari.eventdb.model.TaskData;
-import org.apache.ambari.eventdb.model.WorkflowContext;
-import org.apache.ambari.eventdb.model.WorkflowDag;
-import org.apache.ambari.eventdb.model.Workflows;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
 
 @Provider
-public class JAXBContextResolver implements ContextResolver<JAXBContext> {
-
-  /* NOTE: Remember to add any new Model classes to this list. */
-  private static final Class[] classes = {
-    WorkflowContext.class,
-    WorkflowDag.class,
-    WorkflowDag.WorkflowDagEntry.class,
-    Jobs.class,
-    Jobs.JobDBEntry.class,
-    Workflows.class,
-    Workflows.WorkflowDBEntry.class,
-    TaskData.class,
-    TaskData.Point.class,
-    TaskLocalityData.class,
-    TaskLocalityData.DataPoint.class
-  };
-
-  private static final Set<Class> types = 
-    new HashSet<Class>(Arrays.asList(classes));
-
-  private static final JAXBContext context;
-
-  static {
-    JAXBContext tmpContext;
-
-    try {
-      tmpContext = new JSONJAXBContext(JSONConfiguration.natural().build(), 
classes);
-    } catch (Exception e) {
-      /* Do Nothing (with the exception). */
-      tmpContext = null;
-    }
-
-    context = tmpContext;
+public class JAXBContextResolver extends JacksonJaxbJsonProvider {
+  public JAXBContextResolver() {
+    super();
   }
-
+  
   @Override
-  public JAXBContext getContext(Class<?> classType) {
-    return (types.contains(classType)) ? context : null;
+  public ObjectMapper locateMapper(Class<?> type, MediaType mediaType) {
+    ObjectMapper mapper = super.locateMapper(type, mediaType);
+    mapper.setSerializationInclusion(Inclusion.NON_NULL);
+    return mapper;
   }
 }

Modified: 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
 Fri Jan 11 05:20:30 2013
@@ -34,6 +34,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.ambari.eventdb.db.PostgresConnector;
+import org.apache.ambari.eventdb.model.DataTable;
 import org.apache.ambari.eventdb.model.Jobs;
 import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
 import org.apache.ambari.eventdb.model.TaskAttempt;
@@ -43,6 +44,7 @@ import org.apache.ambari.eventdb.model.T
 import org.apache.ambari.eventdb.model.TaskLocalityData.DataPoint;
 import org.apache.ambari.eventdb.model.Workflows;
 import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
+import 
org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
 
 @Path("/jobhistory")
 public class WorkflowJsonService {
@@ -57,8 +59,12 @@ public class WorkflowJsonService {
   private static final String DEFAULT_USERNAME = "mapred";
   private static final String DEFAULT_PASSWORD = "mapred";
   
-  private static final List<WorkflowDBEntry> EMPTY_WORKFLOWS = 
Collections.emptyList();
+  private static final Workflows EMPTY_WORKFLOWS = new Workflows();
   private static final List<JobDBEntry> EMPTY_JOBS = Collections.emptyList();
+  {
+    List<WorkflowDBEntry> emptyWorkflows = Collections.emptyList();
+    EMPTY_WORKFLOWS.setWorkflows(emptyWorkflows);
+  }
   
   PostgresConnector getConnector() throws IOException {
     return new PostgresConnector(DEFAULT_HOSTNAME, DEFAULT_DBNAME, 
DEFAULT_USERNAME, DEFAULT_PASSWORD);
@@ -70,15 +76,23 @@ public class WorkflowJsonService {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/workflow")
-  public Workflows getWorkflows() {
-    Workflows workflows = new Workflows();
+  public Workflows getWorkflows(@QueryParam("orderBy") String field, 
@DefaultValue(PostgresConnector.SORT_ASC) @QueryParam("sortDir") String sortDir,
+      @DefaultValue("0") @QueryParam("offset") int offset, @DefaultValue("-1") 
@QueryParam("limit") int limit) {
+    Workflows workflows = EMPTY_WORKFLOWS;
     PostgresConnector conn = null;
     try {
       conn = getConnector();
-      workflows.setWorkflows(conn.fetchWorkflows());
+      if (field == null)
+        workflows = conn.fetchWorkflows();
+      else {
+        field = field.toUpperCase();
+        if ("ELAPSEDTIME".equals(field))
+          field = "DURATION";
+        workflows = conn.fetchWorkflows(WorkflowFields.valueOf(field), 
sortDir.toUpperCase().equals(PostgresConnector.SORT_ASC), offset, limit);
+      }
     } catch (IOException e) {
       e.printStackTrace();
-      workflows.setWorkflows(EMPTY_WORKFLOWS);
+      workflows = EMPTY_WORKFLOWS;
     } finally {
       if (conn != null) {
         conn.close();
@@ -89,6 +103,77 @@ public class WorkflowJsonService {
   
   @GET
   @Produces(MediaType.APPLICATION_JSON)
+  @Path("/datatable")
+  public DataTable getWorkflowDataTable(@DefaultValue("0") 
@QueryParam("iDisplayStart") int start,
+      @DefaultValue("10") @QueryParam("iDisplayLength") int amount, 
@QueryParam("sSearch") String searchTerm, @DefaultValue("0") 
@QueryParam("sEcho") int echo,
+      @DefaultValue("0") @QueryParam("iSortCol_0") int col, 
@DefaultValue(PostgresConnector.SORT_ASC) @QueryParam("sSortDir_0") String sdir,
+      @QueryParam("sSearch_0") String workflowId, @QueryParam("sSearch_1") 
String workflowName, @QueryParam("sSearch_2") String workflowType,
+      @QueryParam("sSearch_3") String userName, @DefaultValue("-1") 
@QueryParam("minJobs") int minJobs, @DefaultValue("-1") @QueryParam("maxJobs") 
int maxJobs,
+      @DefaultValue("-1") @QueryParam("minInputBytes") long minInputBytes, 
@DefaultValue("-1") @QueryParam("maxInputBytes") long maxInputBytes,
+      @DefaultValue("-1") @QueryParam("minOutputBytes") long minOutputBytes, 
@DefaultValue("-1") @QueryParam("maxOutputBytes") long maxOutputBytes,
+      @DefaultValue("-1") @QueryParam("minDuration") long minDuration, 
@DefaultValue("-1") @QueryParam("maxDuration") long maxDuration,
+      @DefaultValue("-1") @QueryParam("minStartTime") long minStartTime, 
@DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime) {
+    
+    if (start < 0)
+      start = 0;
+    if (amount < 10 || amount > 100)
+      amount = 10;
+    
+    boolean sortAscending = true;
+    if (!sdir.toUpperCase().equals(PostgresConnector.SORT_ASC))
+      sortAscending = false;
+    
+    WorkflowFields field = null;
+    switch (col) {
+      case 0: // workflowId
+        field = WorkflowFields.WORKFLOWID;
+        break;
+      case 1: // workflowName
+        field = WorkflowFields.WORKFLOWNAME;
+        break;
+      case 2: // workflowType
+        field = WorkflowFields.WORKFLOWID;
+        break;
+      case 3: // userName
+        field = WorkflowFields.USERNAME;
+        break;
+      case 4: // numJobsTotal
+        field = WorkflowFields.NUMJOBSTOTAL;
+        break;
+      case 5: // inputBytes
+        field = WorkflowFields.INPUTBYTES;
+        break;
+      case 6: // outputBytes
+        field = WorkflowFields.OUTPUTBYTES;
+        break;
+      case 7: // duration
+        field = WorkflowFields.DURATION;
+        break;
+      case 8: // startTime
+        field = WorkflowFields.STARTTIME;
+        break;
+      default:
+        field = WorkflowFields.WORKFLOWID;
+    }
+    
+    DataTable table = null;
+    PostgresConnector conn = null;
+    try {
+      conn = getConnector();
+      table = conn.fetchWorkflows(start, amount, searchTerm, echo, field, 
sortAscending, workflowId, workflowName, workflowType, userName, minJobs, 
maxJobs,
+          minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, 
minDuration, maxDuration, minStartTime, maxStartTime);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+    return table;
+  }
+  
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
   @Path("/job")
   public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
     Jobs jobs = new Jobs();

Modified: incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py 
(original)
+++ incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py Fri Jan 
11 05:20:30 2013
@@ -91,12 +91,18 @@ def runAgent(passPhrase):
   try:
     # print this to the log.  despite the directory, machine.py works with 
Python 2.4
     ret = execOsCommand(["python", 
"/usr/lib/python2.6/site-packages/ambari_agent/machine.py"])
+    if not 0 == ret['exitstatus']:
+      return ret['exitstatus']
     print ret['log']
 
     ret = execOsCommand(["tail", "-20", 
"/var/log/ambari-agent/ambari-agent.log"])
+    if not 0 == ret['exitstatus']:
+      return ret['exitstatus']
     print ret['log']
+
+    return 0
   except (Exception), e:
-    return
+    return 1
 
 def main(argv=None):
   scriptDir = os.path.realpath(os.path.dirname(argv[0]))
@@ -113,7 +119,7 @@ def main(argv=None):
     installAgent()
 
   configureAgent(hostName)
-  runAgent(passPhrase)
+  sys.exit(runAgent(passPhrase))
   
 if __name__ == '__main__':
   logging.basicConfig(level=logging.DEBUG)

Modified: 
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
 Fri Jan 11 05:20:30 2013
@@ -177,6 +177,8 @@ CREATE TABLE workflow (
   workflowContext TEXT, userName TEXT,
   startTime BIGINT, lastUpdateTime BIGINT,
   numJobsTotal INTEGER, numJobsCompleted INTEGER,
+  inputBytes BIGINT, outputBytes BIGINT,
+  duration BIGINT,
   PRIMARY KEY (workflowId),
   FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId)
 );

Modified: 
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
 (original)
+++ 
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
 Fri Jan 11 05:20:30 2013
@@ -379,7 +379,7 @@ function hdp_mon_generate_response( $res
       case "HBASE":
       case "ZOOKEEPER":
       case "OOZIE":
-      case "TEMPLETON":
+      case "WEBHCAT":
       case "GANGLIA":
       case "PUPPET":
         break;

Modified: 
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- 
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
 (original)
+++ 
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
 Fri Jan 11 05:20:30 2013
@@ -141,11 +141,12 @@ public class MapReduceJobHistoryUpdater 
                 "userName, " +
                 "startTime, " +
                 "lastUpdateTime, " +
+                "duration, " +
                 "numJobsTotal, " +
                 "numJobsCompleted" +
                 ") " +
                 "VALUES" +
-                " (?, ?, ?, ?, ?, ?, ?, ?)"
+                " (?, ?, ?, ?, ?, ?, 0, ?, 0)"
             );
     
     workflowUpdateTimePS =
@@ -153,21 +154,33 @@ public class MapReduceJobHistoryUpdater 
             "UPDATE " +
                 WORKFLOW_TABLE +
                 " SET " +
-                "lastUpdateTime = ? " +
+                "lastUpdateTime = ?, " +
+                "duration = ? - (SELECT startTime FROM " +
+                WORKFLOW_TABLE +
+                " WHERE workflowId = ?) " +
                 "WHERE workflowId = ?"
             );
     
     workflowUpdateNumCompletedPS =
         connection.prepareStatement(
-            "UPDATE " +
+            "WITH sums as (SELECT sum(inputBytes) as input, " +
+                "sum(outputBytes) as output, workflowId FROM " +
+                JOB_TABLE +
+                " WHERE workflowId = (SELECT workflowId FROM " +
+                JOB_TABLE +
+                " WHERE jobId = ?) AND status = 'SUCCESS'" +
+                " GROUP BY workflowId) " +
+                "UPDATE " +
                 WORKFLOW_TABLE +
                 " SET " +
                 "lastUpdateTime = ?, " +
-                "numJobsCompleted = numJobsCompleted + 1 " +
-                "WHERE workflowId = " +
-                "(SELECT workflowId FROM " +
-                JOB_TABLE +
-                " WHERE jobId = ?)"
+                "duration = ? - (SELECT startTime FROM " +
+                WORKFLOW_TABLE +
+                " WHERE workflowId = (SELECT workflowId FROM sums)), " +
+                "numJobsCompleted = numJobsCompleted + 1, " +
+                "inputBytes = (select input from sums), " +
+                "outputBytes = (select output from sums) " +
+                "WHERE workflowId = (select workflowId from sums)"
             );
     
     // JobFinishedEvent
@@ -469,13 +482,13 @@ public class MapReduceJobHistoryUpdater 
           workflowUpdateTimePS, originalEvent, 
           (JobSubmittedEvent)parsedEvent);
     } else if (eventClass == JobFinishedEvent.class) {
-      processJobFinishedEvent(entityPS, 
+      processJobFinishedEvent(entityPS, workflowUpdateNumCompletedPS,
           originalEvent, (JobFinishedEvent)parsedEvent);
     } else if (eventClass == JobInitedEvent.class){
       processJobInitedEvent(entityPS, 
           originalEvent, (JobInitedEvent)parsedEvent);
     } else if (eventClass == JobStatusChangedEvent.class) {
-      processJobStatusChangedEvent(entityPS, workflowUpdateNumCompletedPS,
+      processJobStatusChangedEvent(entityPS,
           originalEvent, (JobStatusChangedEvent)parsedEvent);
     } else if (eventClass == JobInfoChangeEvent.class) {
       processJobInfoChangeEvent(entityPS, 
@@ -636,13 +649,14 @@ public class MapReduceJobHistoryUpdater 
         workflowPS.setLong(5, historyEvent.getSubmitTime());
         workflowPS.setLong(6, historyEvent.getSubmitTime());
         workflowPS.setLong(7, workflowContext.getWorkflowDag().size());
-        workflowPS.setLong(8, 0);
         workflowPS.executeUpdate();
         LOG.debug("Successfully inserted workflowId = " + 
             workflowContext.getWorkflowId());
       } else {
         workflowUpdateTimePS.setLong(1, historyEvent.getSubmitTime());
-        workflowUpdateTimePS.setString(2, workflowContext.getWorkflowId());
+        workflowUpdateTimePS.setLong(2, historyEvent.getSubmitTime());
+        workflowUpdateTimePS.setString(3, workflowContext.getWorkflowId());
+        workflowUpdateTimePS.setString(4, workflowContext.getWorkflowId());
         workflowUpdateTimePS.executeUpdate();
         LOG.debug("Successfully updated workflowId = " + 
             workflowContext.getWorkflowId());
@@ -666,6 +680,7 @@ public class MapReduceJobHistoryUpdater 
   
   private void processJobFinishedEvent(
       PreparedStatement entityPS,
+      PreparedStatement workflowUpdateNumCompletedPS,
       LoggingEvent logEvent, JobFinishedEvent historyEvent) {
     Counters counters = historyEvent.getMapCounters();
     long inputBytes = 0;
@@ -698,6 +713,11 @@ public class MapReduceJobHistoryUpdater 
       entityPS.setLong(7, outputBytes);
       entityPS.setString(8, historyEvent.getJobid().toString());
       entityPS.executeUpdate();
+      // job finished events always have success status
+      workflowUpdateNumCompletedPS.setString(1, 
historyEvent.getJobid().toString());
+      workflowUpdateNumCompletedPS.setLong(2, historyEvent.getFinishTime());
+      workflowUpdateNumCompletedPS.setLong(3, historyEvent.getFinishTime());
+      workflowUpdateNumCompletedPS.executeUpdate();
     } catch (SQLException sqle) {
       LOG.info("Failed to store " + historyEvent.getEventType() + " for job " 
+ 
           historyEvent.getJobid() + " into " + JOB_TABLE, sqle);
@@ -725,18 +745,11 @@ public class MapReduceJobHistoryUpdater 
 
   private void processJobStatusChangedEvent(
       PreparedStatement entityPS, 
-      PreparedStatement workflowUpdateNumCompletedPS, 
       LoggingEvent logEvent, JobStatusChangedEvent historyEvent) {
     try {
       entityPS.setString(1, historyEvent.getStatus());
       entityPS.setString(2, historyEvent.getJobId().toString());
       entityPS.executeUpdate();
-      if ("SUCCESS".equals(historyEvent.getStatus())) {
-        workflowUpdateNumCompletedPS.setLong(1, System.currentTimeMillis());
-        workflowUpdateNumCompletedPS.setString(2, 
-            historyEvent.getJobId().toString());
-        workflowUpdateNumCompletedPS.executeUpdate();
-      }
     } catch (SQLException sqle) {
       LOG.info("Failed to store " + historyEvent.getEventType() + " for job " 
+ 
           historyEvent.getJobId() + " into " + JOB_TABLE, sqle);


Reply via email to