Author: mahadev
Date: Fri Jan 11 05:20:30 2013
New Revision: 1431892
URL: http://svn.apache.org/viewvc?rev=1431892&view=rev
Log:
AMBARI-1114. BootStrap fails but the api says thats its done and exit status is
0. (Nate Cole via mahadev)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Jan 11 05:20:30 2013
@@ -16,6 +16,9 @@ should be listed by their full name.
returns 200 (no op), even though GET with the same predicate returns a number
of host_components. (Tom Beerbower via mahadev)
+ AMBARI-1114. BootStrap fails but the api says thats its done and exit status
+ is 0. (Nate Cole via mahadev)
+
BUG FIXES
Modified:
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/server.pp
Fri Jan 11 05:20:30 2013
@@ -38,6 +38,17 @@ class hdp-mysql::server(
require => Anchor['hdp-mysql::server::begin']
}
+
+ if ($hdp::params::hdp_os_type == "suse") {
+ # On Suse, creating symlink from default mysqld pid file to expected
/var/run location
+ file { '/var/run/mysqld.pid':
+ ensure => 'link',
+ target => '/var/lib/mysql/mysqld.pid',
+ require => Hdp::Package['mysql'],
+ }
+ }
+
+
if hdp_is_empty($hdp::params::services_names[mysql]) {
hdp_fail("There is no service name for service mysql")
}
@@ -58,28 +69,52 @@ class hdp-mysql::server(
$service_name = $service_name_by_os[$hdp::params::hdp_os_type]
}
- service {$service_name:
- ensure => running,
- require => Hdp::Package['mysql'],
- notify => File['/tmp/startMysql.sh']
- }
-
- file { '/tmp/startMysql.sh':
- ensure => present,
- source => "puppet:///modules/hdp-mysql/startMysql.sh",
- mode => '0755',
- require => service[$service_name],
- notify => Exec['/tmp/startMysql.sh']
- }
-
- exec { '/tmp/startMysql.sh':
- command => "sh /tmp/startMysql.sh ${db_user} ${db_pw} ${host}",
- tries => 3,
- try_sleep => 5,
- require => File['/tmp/startMysql.sh'],
- path => '/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
- notify => Anchor['hdp-mysql::server::end'],
- logoutput => "true"
+ $mysqld_state = $service_state ? {
+ 'running' => 'running',
+ default => 'stopped',
+ }
+
+ if ($hdp::params::hdp_os_type == "suse") {
+ service {$service_name:
+ ensure => $mysqld_state,
+ require => File['/var/run/mysqld.pid'],
+ notify => File['/tmp/addMysqlUser.sh']
+ }
+ } else {
+ service {$service_name:
+ ensure => $mysqld_state,
+ require => Hdp::Package['mysql'],
+ notify => File['/tmp/addMysqlUser.sh']
+ }
+ }
+
+
+ if ($service_state == 'installed_and_configured') {
+
+ file {'/tmp/addMysqlUser.sh':
+ ensure => present,
+ source => "puppet:///modules/hdp-mysql/addMysqlUser.sh",
+ mode => '0755',
+ require => Service[$service_name],
+ notify => Exec['/tmp/addMysqlUser.sh'],
+ }
+ # We start the DB and add a user
+ exec { '/tmp/addMysqlUser.sh':
+ command => "sh /tmp/addMysqlUser.sh ${service_name} ${db_user}
${db_pw} ${host}",
+ tries => 3,
+ try_sleep => 5,
+ require => File['/tmp/addMysqlUser.sh'],
+ path => '/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
+ notify => Anchor['hdp-mysql::server::end'],
+ logoutput => "true",
+ }
+ } else {
+ # Now MySQL is running so we remove the temporary file
+ file {'/tmp/addMysqlUser.sh':
+ ensure => absent,
+ require => Service[$service_name],
+ notify => Anchor['hdp-mysql::server::end'],
+ }
}
anchor { 'hdp-mysql::server::end':}
Modified:
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-servicegroups.cfg.erb
Fri Jan 11 05:20:30 2013
@@ -15,8 +15,8 @@ define servicegroup {
alias OOZIE Checks
}
define servicegroup {
- servicegroup_name TEMPLETON
- alias TEMPLETON Checks
+ servicegroup_name WEBHCAT
+ alias WEBHCAT Checks
}
define servicegroup {
servicegroup_name NAGIOS
Modified:
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb
Fri Jan 11 05:20:30 2013
@@ -449,13 +449,13 @@ define service {
max_check_attempts 3
}
<%end-%>
-<%if scope.function_hdp_nagios_members_exist('templeton-server')-%>
-# Templeton check
+<%if scope.function_hdp_nagios_members_exist('webhcat-server')-%>
+# WEBHCAT check
define service {
- hostgroup_name templeton-server
+ hostgroup_name webhcat-server
use hadoop-service
- service_description TEMPLETON::Templeton status check
- servicegroups TEMPLETON
+ service_description WEBHCAT::WEBHCAT status check
+ servicegroups WEBHCAT
<%if scope.function_hdp_template_var("security_enabled")-%>
check_command
check_templeton_status!50111!v1!true!<%=scope.function_hdp_template_var("keytab_path")%>/<%=scope.function_hdp_template_var("nagios_user")%>.headless.keytab!<%=scope.function_hdp_template_var("nagios_user")%>
<%else-%>
Modified:
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
Fri Jan 11 05:20:30 2013
@@ -40,11 +40,24 @@ class hdp-oozie::service(
$lzo_jar_suffix = ""
}
+
if ($ensure == 'running') {
- $daemon_cmd = "/bin/sh -c 'cd /usr/lib/oozie && tar -xvf
oozie-sharelib.tar.gz && mkdir -p ${oozie_tmp} && chown ${user}:hadoop
${oozie_tmp} && cd ${oozie_tmp}' && su - ${user} -c
'/usr/lib/oozie/bin/oozie-setup.sh -hadoop 0.20.200 $jar_location -extjs
$ext_js_path $lzo_jar_suffix && /usr/lib/oozie/bin/ooziedb.sh create -sqlfile
oozie.sql -run ; hadoop dfs -put /usr/lib/oozie/share share ; hadoop dfs -chmod
-R 755 /user/${user}/share && /usr/lib/oozie/bin/oozie-start.sh' "
+
+ $cmd1 = "cd /usr/lib/oozie && tar -xvf oozie-sharelib.tar.gz"
+ $cmd2 = "cd /usr/lib/oozie && mkdir -p ${oozie_tmp}"
+ $cmd3 = "cd /usr/lib/oozie && chown ${user}:hadoop ${oozie_tmp}"
+
+ $cmd4 = "cd ${oozie_tmp} && /usr/lib/oozie/bin/oozie-setup.sh -hadoop
0.20.200 $jar_location -extjs $ext_js_path $lzo_jar_suffix"
+ $cmd5 = "cd ${oozie_tmp} && /usr/lib/oozie/bin/ooziedb.sh create -sqlfile
oozie.sql -run ; hadoop dfs -put /usr/lib/oozie/share share ; hadoop dfs -chmod
-R 755 /user/${user}/share"
+ $cmd6 = "/usr/lib/oozie/bin/oozie-start.sh"
+
+
+ $sh_cmds = [$cmd1, $cmd2, $cmd3]
+ $user_cmds = [$cmd4, $cmd5, $cmd6]
+
$no_op_test = "ls ${pid_file} >/dev/null 2>&1 && ps `cat ${pid_file}`
>/dev/null 2>&1"
} elsif ($ensure == 'stopped') {
- $daemon_cmd = "su - ${user} -c 'cd ${oozie_tmp} &&
/usr/lib/oozie/bin/oozie-stop.sh'"
+ $stop_cmd = "su - ${user} -c 'cd ${oozie_tmp} &&
/usr/lib/oozie/bin/oozie-stop.sh'"
$no_op_test = undef
} else {
$daemon_cmd = undef
@@ -58,13 +71,16 @@ class hdp-oozie::service(
anchor{'hdp-oozie::service::begin':} -> Hdp-oozie::Service::Directory<||> ->
anchor{'hdp-oozie::service::end':}
- if ($daemon_cmd != undef) {
- hdp::exec { $daemon_cmd:
- command => $daemon_cmd,
+ if ($ensure == 'running') {
+ hdp-oozie::service::exec_sh{$sh_cmds:}
+ hdp-oozie::service::exec_user{$user_cmds:}
+ Hdp-oozie::Service::Directory<||> -> Hdp-oozie::Service::Exec_sh[$cmd1] ->
Hdp-oozie::Service::Exec_sh[$cmd2] ->Hdp-oozie::Service::Exec_sh[$cmd3] ->
Hdp-oozie::Service::Exec_user[$cmd4] ->Hdp-oozie::Service::Exec_user[$cmd5] ->
Hdp-oozie::Service::Exec_user[$cmd6] -> Anchor['hdp-oozie::service::end']
+ } elsif ($ensure == 'stopped') {
+ hdp::exec { "exec $stop_cmd":
+ command => $stop_cmd,
unless => $no_op_test,
initial_wait => $initial_wait
- }
- Hdp-oozie::Service::Directory<||> -> Hdp::Exec[$daemon_cmd] ->
Anchor['hdp-oozie::service::end']
+ }
}
}
@@ -84,3 +100,23 @@ define hdp-oozie::service::createsymlink
unless => "test -e /usr/lib/oozie/oozie-server/lib/mapred-site.xml"
}
}
+
+define hdp-oozie::service::exec_sh()
+{
+ $no_op_test = "ls ${pid_file} >/dev/null 2>&1 && ps `cat ${pid_file}`
>/dev/null 2>&1"
+ hdp::exec { "exec $name":
+ command => "/bin/sh -c '$name'",
+ unless => $no_op_test,
+ initial_wait => $initial_wait
+ }
+}
+
+define hdp-oozie::service::exec_user()
+{
+ $no_op_test = "ls ${pid_file} >/dev/null 2>&1 && ps `cat ${pid_file}`
>/dev/null 2>&1"
+ hdp::exec { "exec $name":
+ command => "su - ${user} -c '$name'",
+ unless => $no_op_test,
+ initial_wait => $initial_wait
+ }
+}
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
Fri Jan 11 05:20:30 2013
@@ -19,17 +19,25 @@ package org.apache.ambari.eventdb.db;
import java.io.IOException;
import java.util.List;
+import org.apache.ambari.eventdb.model.DataTable;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
import org.apache.ambari.eventdb.model.TaskAttempt;
import org.apache.ambari.eventdb.model.WorkflowContext;
-import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
+import org.apache.ambari.eventdb.model.Workflows;
+import
org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
public interface DBConnector {
public void submitJob(JobDBEntry j, WorkflowContext context) throws
IOException;
public void updateJob(JobDBEntry j) throws IOException;
- public List<WorkflowDBEntry> fetchWorkflows() throws IOException;
+ public Workflows fetchWorkflows() throws IOException;
+
+ public Workflows fetchWorkflows(WorkflowFields field, boolean sortAscending,
int offset, int limit) throws IOException;
+
+ public DataTable fetchWorkflows(int offset, int limit, String searchTerm,
int echo, WorkflowFields field, boolean sortAscending, String searchWorkflowId,
+ String searchWorkflowName, String searchWorkflowType, String
searchUserName, int minJobs, int maxJobs, long minInputBytes, long
maxInputBytes,
+ long minOutputBytes, long maxOutputBytes, long minDuration, long
maxDuration, long minStartTime, long maxStartTime) throws IOException;
public List<JobDBEntry> fetchJobDetails(String workflowID) throws
IOException;
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
Fri Jan 11 05:20:30 2013
@@ -16,32 +16,44 @@
*/
package org.apache.ambari.eventdb.db;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.eventdb.model.DataTable;
+import org.apache.ambari.eventdb.model.DataTable.AvgData;
+import org.apache.ambari.eventdb.model.DataTable.Summary;
+import org.apache.ambari.eventdb.model.DataTable.Summary.SummaryFields;
+import org.apache.ambari.eventdb.model.DataTable.Times;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry.JobFields;
import org.apache.ambari.eventdb.model.TaskAttempt;
import org.apache.ambari.eventdb.model.TaskAttempt.TaskAttemptFields;
import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.Workflows;
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
import
org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
-import java.io.IOException;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-
public class PostgresConnector implements DBConnector {
private static Log LOG = LogFactory.getLog(PostgresConnector.class);
private static final String WORKFLOW_TABLE_NAME = "workflow";
private static final String JOB_TABLE_NAME = "job";
private static final String TASK_ATTEMPT_TABLE_NAME = "taskattempt";
- private static final String AGGREGATE_INPUTBYTES = "inputBytes";
- private static final String AGGREGATE_OUTPUTBYTES = "outputBytes";
+ public static final String SORT_ASC = "ASC";
+ public static final String SORT_DESC = "DESC";
private static final ObjectMapper jsonMapper = new ObjectMapper();
@@ -54,9 +66,15 @@ public class PostgresConnector implement
UJ_UPDATE_JOB_PS(""),
UJ_UPDATE_WORKFLOW_PS(""),
FW_PS("SELECT " + WorkflowDBEntry.WORKFLOW_FIELDS + " FROM " +
WORKFLOW_TABLE_NAME),
+ FW_COUNT_PS("SELECT count(*) as " + SummaryFields.numRows + " FROM " +
WORKFLOW_TABLE_NAME),
+ FW_SUMMARY_PS("SELECT count(*) as " + SummaryFields.numRows + ", "
+ + getAvg(WorkflowFields.NUMJOBSTOTAL, SummaryFields.avgJobs,
SummaryFields.minJobs, SummaryFields.maxJobs) + ", "
+ + getAvg(WorkflowFields.INPUTBYTES, SummaryFields.avgInput,
SummaryFields.minInput, SummaryFields.maxInput) + ", "
+ + getAvg(WorkflowFields.OUTPUTBYTES, SummaryFields.avgOutput,
SummaryFields.minOutput, SummaryFields.maxOutput) + ", "
+ + getAvg(WorkflowFields.DURATION, SummaryFields.avgDuration,
SummaryFields.minDuration, SummaryFields.maxDuration) + ", min("
+ + WorkflowFields.STARTTIME + ") as " + SummaryFields.youngest + ",
max(" + WorkflowFields.STARTTIME + ") as " + SummaryFields.oldest + " FROM "
+ + WORKFLOW_TABLE_NAME),
FJD_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + "
WHERE " + JobFields.WORKFLOWID.toString() + " = ?"),
- WORKFLOW_AGGREGATE_IO("SELECT SUM(" + JobFields.INPUTBYTES + ") as " +
AGGREGATE_INPUTBYTES + ", SUM(" + JobFields.OUTPUTBYTES + ") as "
- + AGGREGATE_OUTPUTBYTES + " FROM " + JOB_TABLE_NAME + " WHERE " +
JobFields.WORKFLOWID.toString() + " = ?"),
FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + "
FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
FTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " +
TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? AND "
+ TaskAttemptFields.TASKTYPE + " = ? ORDER BY " +
TaskAttemptFields.STARTTIME);
@@ -70,6 +88,10 @@ public class PostgresConnector implement
public String getStatementString() {
return statementString;
}
+
+ private static String getAvg(WorkflowFields field, SummaryFields avg,
SummaryFields min, SummaryFields max) {
+ return "avg(" + field + ") as " + avg + ", min(" + field + ") as " + min
+ ", max(" + field + ") as " + max;
+ }
}
private Map<Statements,PreparedStatement> preparedStatements = new
EnumMap<Statements,PreparedStatement>(Statements.class);
@@ -90,74 +112,150 @@ public class PostgresConnector implement
@Override
public void submitJob(JobDBEntry j, WorkflowContext context) throws
IOException {
-// PreparedStatement insertJobPS = getPS(Statements.SJ_INSERT_JOB_PS);
-// PreparedStatement checkWorkflowPS =
getPS(Statements.SJ_CHECK_WORKFLOW_PS);
-// PreparedStatement insertWorkflowPS =
getPS(Statements.SJ_INSERT_WORKFLOW_PS);
+ // PreparedStatement insertJobPS = getPS(Statements.SJ_INSERT_JOB_PS);
+ // PreparedStatement checkWorkflowPS =
getPS(Statements.SJ_CHECK_WORKFLOW_PS);
+ // PreparedStatement insertWorkflowPS =
getPS(Statements.SJ_INSERT_WORKFLOW_PS);
throw new NotImplementedException();
}
@Override
public void updateJob(JobDBEntry j) throws IOException {
-// PreparedStatement updateJobPS = getPS(Statements.UJ_UPDATE_JOB_PS);
-// PreparedStatement updateWorkflowPS =
getPS(Statements.UJ_UPDATE_WORKFLOW_PS);
+ // PreparedStatement updateJobPS = getPS(Statements.UJ_UPDATE_JOB_PS);
+ // PreparedStatement updateWorkflowPS =
getPS(Statements.UJ_UPDATE_WORKFLOW_PS);
throw new NotImplementedException();
}
@Override
- public List<WorkflowDBEntry> fetchWorkflows() throws IOException {
- PreparedStatement ps = getPS(Statements.FW_PS);
+ public Workflows fetchWorkflows() throws IOException {
+ Workflows workflows = new Workflows();
+ workflows.setWorkflows(fetchWorkflows(getPS(Statements.FW_PS)));
+ workflows.setSummary(fetchSummary(getPS(Statements.FW_SUMMARY_PS)));
+ return workflows;
+ }
+
+ @Override
+ public Workflows fetchWorkflows(WorkflowFields field, boolean sortAscending,
int offset, int limit) throws IOException {
+ if (offset < 0)
+ offset = 0;
+ Workflows workflows = new Workflows();
+ workflows.setWorkflows(fetchWorkflows(getQualifiedPS(Statements.FW_PS, "",
field, sortAscending, offset, limit)));
+ workflows.setSummary(fetchSummary(getPS(Statements.FW_SUMMARY_PS)));
+ return workflows;
+ }
+
+ private List<WorkflowDBEntry> fetchWorkflows(PreparedStatement ps) throws
IOException {
List<WorkflowDBEntry> workflows = new ArrayList<WorkflowDBEntry>();
ResultSet rs = null;
try {
rs = ps.executeQuery();
while (rs.next()) {
- WorkflowDBEntry w = new WorkflowDBEntry();
- w.setWorkflowId(WorkflowFields.WORKFLOWID.getString(rs));
- w.setWorkflowName(WorkflowFields.WORKFLOWNAME.getString(rs));
- w.setUserName(WorkflowFields.USERNAME.getString(rs));
- w.setStartTime(WorkflowFields.STARTTIME.getLong(rs));
- long updateTime = WorkflowFields.LASTUPDATETIME.getLong(rs);
- if (updateTime > w.getStartTime())
- w.setElapsedTime(updateTime - w.getStartTime());
- else
- w.setElapsedTime(0);
- w.setNumJobsTotal(WorkflowFields.NUMJOBSTOTAL.getInt(rs));
- w.setNumJobsCompleted(WorkflowFields.NUMJOBSCOMPLETED.getInt(rs));
-
w.setWorkflowContext(jsonMapper.readValue(WorkflowFields.WORKFLOWCONTEXT.getString(rs),
WorkflowContext.class));
- // For each work flow get the aggregate i/o counts from the job table.
TODO: clean up the data structures
- //
- try {
- PreparedStatement aggregateio =
getPS(Statements.WORKFLOW_AGGREGATE_IO);
- aggregateio.setString(1, w.getWorkflowId());
- ResultSet ioresult = aggregateio.executeQuery();
- if (ioresult.next()) {
- w.setInputBytes(ioresult.getLong(AGGREGATE_INPUTBYTES));
- w.setOutputBytes(ioresult.getLong(AGGREGATE_OUTPUTBYTES));
- }
- } catch (SQLException e) {
- throw new IOException(e);
- }
- workflows.add(w);
+ workflows.add(getWorkflowDBEntry(rs));
}
-
} catch (SQLException e) {
throw new IOException(e);
- }finally {
- try{
- if (rs!=null)
- rs.close();
- } catch (SQLException e) {
- LOG.error("Exception while closing ResultSet",e);
- }
+ } finally {
+ try {
+ if (rs != null)
+ rs.close();
+ } catch (SQLException e) {
+ LOG.error("Exception while closing ResultSet", e);
+ }
}
return workflows;
}
+ private Summary fetchSummary(PreparedStatement ps) throws IOException {
+ Summary summary = new Summary();
+ ResultSet rs = null;
+ try {
+ rs = ps.executeQuery();
+ if (rs.next()) {
+ summary.setNumRows(SummaryFields.numRows.getInt(rs));
+ summary.setJobs(getAvgData(rs, SummaryFields.avgJobs,
SummaryFields.minJobs, SummaryFields.maxJobs));
+ summary.setInput(getAvgData(rs, SummaryFields.avgInput,
SummaryFields.minInput, SummaryFields.maxInput));
+ summary.setOutput(getAvgData(rs, SummaryFields.avgOutput,
SummaryFields.minOutput, SummaryFields.maxOutput));
+ summary.setDuration(getAvgData(rs, SummaryFields.avgDuration,
SummaryFields.minDuration, SummaryFields.maxDuration));
+ Times times = new Times();
+ times.setYoungest(SummaryFields.youngest.getLong(rs));
+ times.setOldest(SummaryFields.oldest.getLong(rs));
+ summary.setTimes(times);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ if (rs != null)
+ rs.close();
+ } catch (SQLException e) {
+ LOG.error("Exception while closing ResultSet", e);
+ }
+ }
+ return summary;
+ }
+
+ private static WorkflowDBEntry getWorkflowDBEntry(ResultSet rs) throws
SQLException, JsonParseException, JsonMappingException, IOException {
+ WorkflowDBEntry w = new WorkflowDBEntry();
+ w.setWorkflowId(WorkflowFields.WORKFLOWID.getString(rs));
+ w.setWorkflowName(WorkflowFields.WORKFLOWNAME.getString(rs));
+ w.setUserName(WorkflowFields.USERNAME.getString(rs));
+ w.setStartTime(WorkflowFields.STARTTIME.getLong(rs));
+ w.setElapsedTime(WorkflowFields.DURATION.getLong(rs));
+ w.setNumJobsTotal(WorkflowFields.NUMJOBSTOTAL.getInt(rs));
+ w.setInputBytes(WorkflowFields.INPUTBYTES.getLong(rs));
+ w.setOutputBytes(WorkflowFields.OUTPUTBYTES.getLong(rs));
+ w.setNumJobsCompleted(WorkflowFields.NUMJOBSCOMPLETED.getInt(rs));
+
w.setWorkflowContext(jsonMapper.readValue(WorkflowFields.WORKFLOWCONTEXT.getString(rs),
WorkflowContext.class));
+ return w;
+ }
+
+ private static AvgData getAvgData(ResultSet rs, SummaryFields avg,
SummaryFields min, SummaryFields max) throws SQLException {
+ AvgData avgData = new AvgData();
+ avgData.setAvg(avg.getDouble(rs));
+ avgData.setMin(min.getLong(rs));
+ avgData.setMax(max.getLong(rs));
+ return avgData;
+ }
+
+ @Override
+ public DataTable fetchWorkflows(int offset, int limit, String searchTerm,
int echo, WorkflowFields col, boolean sortAscending, String searchWorkflowId,
+ String searchWorkflowName, String searchWorkflowType, String
searchUserName, int minJobs, int maxJobs, long minInputBytes, long
maxInputBytes,
+ long minOutputBytes, long maxOutputBytes, long minDuration, long
maxDuration, long minStartTime, long maxStartTime) throws IOException {
+ int total = 0;
+ PreparedStatement ps = getPS(Statements.FW_COUNT_PS);
+ ResultSet rs = null;
+ try {
+ rs = ps.executeQuery();
+ if (rs.next())
+ total = SummaryFields.numRows.getInt(rs);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ if (rs != null)
+ rs.close();
+ } catch (SQLException e) {
+ LOG.error("Exception while closing ResultSet", e);
+ }
+ }
+
+ String searchClause = buildSearchClause(searchTerm, searchWorkflowId,
searchWorkflowName, searchWorkflowType, searchUserName, minJobs, maxJobs,
+ minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes,
minDuration, maxDuration, minStartTime, maxStartTime);
+ List<WorkflowDBEntry> workflows =
fetchWorkflows(getQualifiedPS(Statements.FW_PS, searchClause, col,
sortAscending, offset, limit));
+ Summary summary = fetchSummary(getQualifiedPS(Statements.FW_SUMMARY_PS,
searchClause));
+ DataTable table = new DataTable();
+ table.setiTotalRecords(total);
+ table.setiTotalDisplayRecords(summary.getNumRows());
+ table.setAaData(workflows);
+ table.setsEcho(echo);
+ table.setSummary(summary);
+ return table;
+ }
+
@Override
public List<JobDBEntry> fetchJobDetails(String workflowId) throws
IOException {
PreparedStatement ps = getPS(Statements.FJD_PS);
List<JobDBEntry> jobs = new ArrayList<JobDBEntry>();
- ResultSet rs = null;
+ ResultSet rs = null;
try {
ps.setString(1, workflowId);
rs = ps.executeQuery();
@@ -185,13 +283,14 @@ public class PostgresConnector implement
rs.close();
} catch (SQLException e) {
throw new IOException(e);
- }finally {
- if(rs!=null) try {
- rs.close();
+ } finally {
+ if (rs != null)
+ try {
+ rs.close();
} catch (SQLException e) {
- LOG.error("Exception while closing ResultSet",e);
+ LOG.error("Exception while closing ResultSet", e);
}
-
+
}
return jobs;
}
@@ -200,7 +299,7 @@ public class PostgresConnector implement
public long[] fetchJobStartStopTimes(String jobID) throws IOException {
PreparedStatement ps = getPS(Statements.FJSS_PS);
long[] times = new long[2];
- ResultSet rs = null;
+ ResultSet rs = null;
try {
ps.setString(1, jobID);
rs = ps.executeQuery();
@@ -211,11 +310,12 @@ public class PostgresConnector implement
rs.close();
} catch (SQLException e) {
throw new IOException(e);
- }finally {
- if (rs!=null) try {
- rs.close();
+ } finally {
+ if (rs != null)
+ try {
+ rs.close();
} catch (SQLException e) {
- LOG.error("Exception while closing ResultSet",e);
+ LOG.error("Exception while closing ResultSet", e);
}
}
if (times[1] == 0)
@@ -252,11 +352,12 @@ public class PostgresConnector implement
rs.close();
} catch (SQLException e) {
throw new IOException(e);
- }finally {
- if (rs!=null) try {
- rs.close();
+ } finally {
+ if (rs != null)
+ try {
+ rs.close();
} catch (SQLException e) {
- LOG.error("Exception while closing ResultSet",e);
+ LOG.error("Exception while closing ResultSet", e);
}
}
return taskAttempts;
@@ -279,13 +380,109 @@ public class PostgresConnector implement
return preparedStatements.get(statement);
}
+ private PreparedStatement getQualifiedPS(Statements statement, String
searchClause) throws IOException {
+ if (db == null)
+ throw new IOException("postgres db not initialized");
+ try {
+ // LOG.debug("preparing " + statement.getStatementString() +
searchClause);
+ return db.prepareStatement(statement.getStatementString() +
searchClause);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private PreparedStatement getQualifiedPS(Statements statement, String
searchClause, WorkflowFields field, boolean sortAscending, int offset, int
limit)
+ throws IOException {
+ if (db == null)
+ throw new IOException("postgres db not initialized");
+ String limitClause = " ORDER BY " + field.toString() + " " +
(sortAscending ? SORT_ASC : SORT_DESC) + " NULLS " + (sortAscending ? "FIRST "
: "LAST ")
+ + "OFFSET " + offset + (limit >= 0 ? " LIMIT " + limit : "");
+ return getQualifiedPS(statement, searchClause + limitClause);
+ }
+
+ private static void addRangeSearch(StringBuilder sb, WorkflowFields field,
int min, int max) {
+ if (min >= 0)
+ append(sb, greaterThan(field, Integer.toString(min)));
+ if (max >= 0)
+ append(sb, lessThan(field, Integer.toString(max)));
+ }
+
+ private static void addRangeSearch(StringBuilder sb, WorkflowFields field,
long min, long max) {
+ if (min >= 0)
+ append(sb, greaterThan(field, Long.toString(min)));
+ if (max >= 0)
+ append(sb, lessThan(field, Long.toString(max)));
+ }
+
+ private static void append(StringBuilder sb, String s) {
+ if (sb.length() > WHERE.length())
+ sb.append(" and ");
+ sb.append(s);
+ }
+
+ private static String like(WorkflowFields field, String s) {
+ return field.toString() + " like '%" + s + "%'";
+ }
+
+ private static String startsWith(WorkflowFields field, String s) {
+ return field.toString() + " like '" + s + "%'";
+ }
+
+ private static String equals(WorkflowFields field, String s) {
+ return field.toString() + " = '" + s + "'";
+ }
+
+ private static String lessThan(WorkflowFields field, String s) {
+ return field.toString() + " <= " + s;
+ }
+
+ private static String greaterThan(WorkflowFields field, String s) {
+ return field.toString() + " >= " + s;
+ }
+
+ private static final String WHERE = " where";
+
+ private static String buildSearchClause(String searchTerm, String
searchWorkflowId, String searchWorkflowName, String searchWorkflowType,
+ String searchUserName, int minJobs, int maxJobs, long minInputBytes,
long maxInputBytes, long minOutputBytes, long maxOutputBytes, long minDuration,
+ long maxDuration, long minStartTime, long maxStartTime) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(WHERE);
+ if (searchTerm != null && searchTerm.length() > 0) {
+ sb.append(" (");
+ sb.append(like(WorkflowFields.WORKFLOWID, searchTerm));
+ sb.append(" or ");
+ sb.append(like(WorkflowFields.WORKFLOWNAME, searchTerm));
+ sb.append(" or ");
+ sb.append(like(WorkflowFields.USERNAME, searchTerm));
+ sb.append(")");
+ }
+ if (searchWorkflowId != null)
+ append(sb, like(WorkflowFields.WORKFLOWID, searchWorkflowId));
+ if (searchWorkflowName != null)
+ append(sb, like(WorkflowFields.WORKFLOWNAME, searchWorkflowName));
+ if (searchWorkflowType != null)
+ append(sb, startsWith(WorkflowFields.WORKFLOWID, searchWorkflowType));
+ if (searchUserName != null)
+ append(sb, equals(WorkflowFields.USERNAME, searchUserName));
+ addRangeSearch(sb, WorkflowFields.NUMJOBSTOTAL, minJobs, maxJobs);
+ addRangeSearch(sb, WorkflowFields.INPUTBYTES, minInputBytes,
maxInputBytes);
+ addRangeSearch(sb, WorkflowFields.OUTPUTBYTES, minOutputBytes,
maxOutputBytes);
+ addRangeSearch(sb, WorkflowFields.DURATION, minDuration, maxDuration);
+ addRangeSearch(sb, WorkflowFields.STARTTIME, minStartTime, maxStartTime);
+
+ if (sb.length() == WHERE.length())
+ return "";
+ else
+ return sb.toString();
+ }
+
@Override
public void close() {
if (db != null) {
try {
db.close();
} catch (SQLException e) {
- LOG.error("Exception while closing connector",e);
+ LOG.error("Exception while closing connector", e);
}
db = null;
}
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
Fri Jan 11 05:20:30 2013
@@ -16,20 +16,23 @@
*/
package org.apache.ambari.eventdb.model;
-import org.apache.commons.lang.StringUtils;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
+
+import org.apache.ambari.eventdb.model.DataTable.Summary;
+import org.apache.commons.lang.StringUtils;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class Workflows {
List<WorkflowDBEntry> workflows;
+ Summary summary;
public static class WorkflowDBEntry {
public static enum WorkflowFields {
@@ -38,8 +41,11 @@ public class Workflows {
USERNAME,
STARTTIME,
LASTUPDATETIME,
+ DURATION,
NUMJOBSTOTAL,
NUMJOBSCOMPLETED,
+ INPUTBYTES,
+ OUTPUTBYTES,
PARENTWORKFLOWID,
WORKFLOWCONTEXT;
@@ -153,19 +159,19 @@ public class Workflows {
public void setWorkflowContext(WorkflowContext workflowContext) {
this.workflowContext = workflowContext;
}
-
+
public long getInputBytes() {
return inputBytes;
}
-
+
public void setInputBytes(long inputBytes) {
this.inputBytes = inputBytes;
}
-
+
public long getOutputBytes() {
return outputBytes;
}
-
+
public void setOutputBytes(long outputBytes) {
this.outputBytes = outputBytes;
}
@@ -180,4 +186,12 @@ public class Workflows {
public void setWorkflows(List<WorkflowDBEntry> workflows) {
this.workflows = workflows;
}
+
+ public Summary getSummary() {
+ return summary;
+ }
+
+ public void setSummary(Summary summary) {
+ this.summary = summary;
+ }
}
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
Fri Jan 11 05:20:30 2013
@@ -17,62 +17,23 @@
package org.apache.ambari.eventdb.webservice;
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.api.json.JSONJAXBContext;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
+import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.ContextResolver;
-import javax.xml.bind.JAXBContext;
-import org.apache.ambari.eventdb.model.TaskLocalityData;
-import org.apache.ambari.eventdb.model.Jobs;
-import org.apache.ambari.eventdb.model.TaskData;
-import org.apache.ambari.eventdb.model.WorkflowContext;
-import org.apache.ambari.eventdb.model.WorkflowDag;
-import org.apache.ambari.eventdb.model.Workflows;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
@Provider
-public class JAXBContextResolver implements ContextResolver<JAXBContext> {
-
- /* NOTE: Remember to add any new Model classes to this list. */
- private static final Class[] classes = {
- WorkflowContext.class,
- WorkflowDag.class,
- WorkflowDag.WorkflowDagEntry.class,
- Jobs.class,
- Jobs.JobDBEntry.class,
- Workflows.class,
- Workflows.WorkflowDBEntry.class,
- TaskData.class,
- TaskData.Point.class,
- TaskLocalityData.class,
- TaskLocalityData.DataPoint.class
- };
-
- private static final Set<Class> types =
- new HashSet<Class>(Arrays.asList(classes));
-
- private static final JAXBContext context;
-
- static {
- JAXBContext tmpContext;
-
- try {
- tmpContext = new JSONJAXBContext(JSONConfiguration.natural().build(),
classes);
- } catch (Exception e) {
- /* Do Nothing (with the exception). */
- tmpContext = null;
- }
-
- context = tmpContext;
+public class JAXBContextResolver extends JacksonJaxbJsonProvider {
+ public JAXBContextResolver() {
+ super();
}
-
+
@Override
- public JAXBContext getContext(Class<?> classType) {
- return (types.contains(classType)) ? context : null;
+ public ObjectMapper locateMapper(Class<?> type, MediaType mediaType) {
+ ObjectMapper mapper = super.locateMapper(type, mediaType);
+ mapper.setSerializationInclusion(Inclusion.NON_NULL);
+ return mapper;
}
}
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
Fri Jan 11 05:20:30 2013
@@ -34,6 +34,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.ambari.eventdb.db.PostgresConnector;
+import org.apache.ambari.eventdb.model.DataTable;
import org.apache.ambari.eventdb.model.Jobs;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
import org.apache.ambari.eventdb.model.TaskAttempt;
@@ -43,6 +44,7 @@ import org.apache.ambari.eventdb.model.T
import org.apache.ambari.eventdb.model.TaskLocalityData.DataPoint;
import org.apache.ambari.eventdb.model.Workflows;
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
+import
org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
@Path("/jobhistory")
public class WorkflowJsonService {
@@ -57,8 +59,12 @@ public class WorkflowJsonService {
private static final String DEFAULT_USERNAME = "mapred";
private static final String DEFAULT_PASSWORD = "mapred";
- private static final List<WorkflowDBEntry> EMPTY_WORKFLOWS =
Collections.emptyList();
+ private static final Workflows EMPTY_WORKFLOWS = new Workflows();
private static final List<JobDBEntry> EMPTY_JOBS = Collections.emptyList();
+ {
+ List<WorkflowDBEntry> emptyWorkflows = Collections.emptyList();
+ EMPTY_WORKFLOWS.setWorkflows(emptyWorkflows);
+ }
PostgresConnector getConnector() throws IOException {
return new PostgresConnector(DEFAULT_HOSTNAME, DEFAULT_DBNAME,
DEFAULT_USERNAME, DEFAULT_PASSWORD);
@@ -70,15 +76,23 @@ public class WorkflowJsonService {
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/workflow")
- public Workflows getWorkflows() {
- Workflows workflows = new Workflows();
+ public Workflows getWorkflows(@QueryParam("orderBy") String field,
@DefaultValue(PostgresConnector.SORT_ASC) @QueryParam("sortDir") String sortDir,
+ @DefaultValue("0") @QueryParam("offset") int offset, @DefaultValue("-1")
@QueryParam("limit") int limit) {
+ Workflows workflows = EMPTY_WORKFLOWS;
PostgresConnector conn = null;
try {
conn = getConnector();
- workflows.setWorkflows(conn.fetchWorkflows());
+ if (field == null)
+ workflows = conn.fetchWorkflows();
+ else {
+ field = field.toUpperCase();
+ if ("ELAPSEDTIME".equals(field))
+ field = "DURATION";
+ workflows = conn.fetchWorkflows(WorkflowFields.valueOf(field),
sortDir.toUpperCase().equals(PostgresConnector.SORT_ASC), offset, limit);
+ }
} catch (IOException e) {
e.printStackTrace();
- workflows.setWorkflows(EMPTY_WORKFLOWS);
+ workflows = EMPTY_WORKFLOWS;
} finally {
if (conn != null) {
conn.close();
@@ -89,6 +103,77 @@ public class WorkflowJsonService {
@GET
@Produces(MediaType.APPLICATION_JSON)
+ @Path("/datatable")
+ public DataTable getWorkflowDataTable(@DefaultValue("0")
@QueryParam("iDisplayStart") int start,
+ @DefaultValue("10") @QueryParam("iDisplayLength") int amount,
@QueryParam("sSearch") String searchTerm, @DefaultValue("0")
@QueryParam("sEcho") int echo,
+ @DefaultValue("0") @QueryParam("iSortCol_0") int col,
@DefaultValue(PostgresConnector.SORT_ASC) @QueryParam("sSortDir_0") String sdir,
+ @QueryParam("sSearch_0") String workflowId, @QueryParam("sSearch_1")
String workflowName, @QueryParam("sSearch_2") String workflowType,
+ @QueryParam("sSearch_3") String userName, @DefaultValue("-1")
@QueryParam("minJobs") int minJobs, @DefaultValue("-1") @QueryParam("maxJobs")
int maxJobs,
+ @DefaultValue("-1") @QueryParam("minInputBytes") long minInputBytes,
@DefaultValue("-1") @QueryParam("maxInputBytes") long maxInputBytes,
+ @DefaultValue("-1") @QueryParam("minOutputBytes") long minOutputBytes,
@DefaultValue("-1") @QueryParam("maxOutputBytes") long maxOutputBytes,
+ @DefaultValue("-1") @QueryParam("minDuration") long minDuration,
@DefaultValue("-1") @QueryParam("maxDuration") long maxDuration,
+ @DefaultValue("-1") @QueryParam("minStartTime") long minStartTime,
@DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime) {
+
+ if (start < 0)
+ start = 0;
+ if (amount < 10 || amount > 100)
+ amount = 10;
+
+ boolean sortAscending = true;
+ if (!sdir.toUpperCase().equals(PostgresConnector.SORT_ASC))
+ sortAscending = false;
+
+ WorkflowFields field = null;
+ switch (col) {
+ case 0: // workflowId
+ field = WorkflowFields.WORKFLOWID;
+ break;
+ case 1: // workflowName
+ field = WorkflowFields.WORKFLOWNAME;
+ break;
+ case 2: // workflowType
+ field = WorkflowFields.WORKFLOWID;
+ break;
+ case 3: // userName
+ field = WorkflowFields.USERNAME;
+ break;
+ case 4: // numJobsTotal
+ field = WorkflowFields.NUMJOBSTOTAL;
+ break;
+ case 5: // inputBytes
+ field = WorkflowFields.INPUTBYTES;
+ break;
+ case 6: // outputBytes
+ field = WorkflowFields.OUTPUTBYTES;
+ break;
+ case 7: // duration
+ field = WorkflowFields.DURATION;
+ break;
+ case 8: // startTime
+ field = WorkflowFields.STARTTIME;
+ break;
+ default:
+ field = WorkflowFields.WORKFLOWID;
+ }
+
+ DataTable table = null;
+ PostgresConnector conn = null;
+ try {
+ conn = getConnector();
+ table = conn.fetchWorkflows(start, amount, searchTerm, echo, field,
sortAscending, workflowId, workflowName, workflowType, userName, minJobs,
maxJobs,
+ minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes,
minDuration, maxDuration, minStartTime, maxStartTime);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ return table;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/job")
public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
Jobs jobs = new Jobs();
Modified: incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py
(original)
+++ incubator/ambari/trunk/ambari-server/src/main/python/setupAgent.py Fri Jan
11 05:20:30 2013
@@ -91,12 +91,18 @@ def runAgent(passPhrase):
try:
# print this to the log. despite the directory, machine.py works with
Python 2.4
ret = execOsCommand(["python",
"/usr/lib/python2.6/site-packages/ambari_agent/machine.py"])
+ if not 0 == ret['exitstatus']:
+ return ret['exitstatus']
print ret['log']
ret = execOsCommand(["tail", "-20",
"/var/log/ambari-agent/ambari-agent.log"])
+ if not 0 == ret['exitstatus']:
+ return ret['exitstatus']
print ret['log']
+
+ return 0
except (Exception), e:
- return
+ return 1
def main(argv=None):
scriptDir = os.path.realpath(os.path.dirname(argv[0]))
@@ -113,7 +119,7 @@ def main(argv=None):
installAgent()
configureAgent(hostName)
- runAgent(passPhrase)
+ sys.exit(runAgent(passPhrase))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Modified:
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
Fri Jan 11 05:20:30 2013
@@ -177,6 +177,8 @@ CREATE TABLE workflow (
workflowContext TEXT, userName TEXT,
startTime BIGINT, lastUpdateTime BIGINT,
numJobsTotal INTEGER, numJobsCompleted INTEGER,
+ inputBytes BIGINT, outputBytes BIGINT,
+ duration BIGINT,
PRIMARY KEY (workflowId),
FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId)
);
Modified:
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
(original)
+++
incubator/ambari/trunk/contrib/addons/src/addOns/nagios/scripts/nagios_alerts.php
Fri Jan 11 05:20:30 2013
@@ -379,7 +379,7 @@ function hdp_mon_generate_response( $res
case "HBASE":
case "ZOOKEEPER":
case "OOZIE":
- case "TEMPLETON":
+ case "WEBHCAT":
case "GANGLIA":
case "PUPPET":
break;
Modified:
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java?rev=1431892&r1=1431891&r2=1431892&view=diff
==============================================================================
---
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
(original)
+++
incubator/ambari/trunk/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
Fri Jan 11 05:20:30 2013
@@ -141,11 +141,12 @@ public class MapReduceJobHistoryUpdater
"userName, " +
"startTime, " +
"lastUpdateTime, " +
+ "duration, " +
"numJobsTotal, " +
"numJobsCompleted" +
") " +
"VALUES" +
- " (?, ?, ?, ?, ?, ?, ?, ?)"
+ " (?, ?, ?, ?, ?, ?, 0, ?, 0)"
);
workflowUpdateTimePS =
@@ -153,21 +154,33 @@ public class MapReduceJobHistoryUpdater
"UPDATE " +
WORKFLOW_TABLE +
" SET " +
- "lastUpdateTime = ? " +
+ "lastUpdateTime = ?, " +
+ "duration = ? - (SELECT startTime FROM " +
+ WORKFLOW_TABLE +
+ " WHERE workflowId = ?) " +
"WHERE workflowId = ?"
);
workflowUpdateNumCompletedPS =
connection.prepareStatement(
- "UPDATE " +
+ "WITH sums as (SELECT sum(inputBytes) as input, " +
+ "sum(outputBytes) as output, workflowId FROM " +
+ JOB_TABLE +
+ " WHERE workflowId = (SELECT workflowId FROM " +
+ JOB_TABLE +
+ " WHERE jobId = ?) AND status = 'SUCCESS'" +
+ " GROUP BY workflowId) " +
+ "UPDATE " +
WORKFLOW_TABLE +
" SET " +
"lastUpdateTime = ?, " +
- "numJobsCompleted = numJobsCompleted + 1 " +
- "WHERE workflowId = " +
- "(SELECT workflowId FROM " +
- JOB_TABLE +
- " WHERE jobId = ?)"
+ "duration = ? - (SELECT startTime FROM " +
+ WORKFLOW_TABLE +
+ " WHERE workflowId = (SELECT workflowId FROM sums)), " +
+ "numJobsCompleted = numJobsCompleted + 1, " +
+ "inputBytes = (select input from sums), " +
+ "outputBytes = (select output from sums) " +
+ "WHERE workflowId = (select workflowId from sums)"
);
// JobFinishedEvent
@@ -469,13 +482,13 @@ public class MapReduceJobHistoryUpdater
workflowUpdateTimePS, originalEvent,
(JobSubmittedEvent)parsedEvent);
} else if (eventClass == JobFinishedEvent.class) {
- processJobFinishedEvent(entityPS,
+ processJobFinishedEvent(entityPS, workflowUpdateNumCompletedPS,
originalEvent, (JobFinishedEvent)parsedEvent);
} else if (eventClass == JobInitedEvent.class){
processJobInitedEvent(entityPS,
originalEvent, (JobInitedEvent)parsedEvent);
} else if (eventClass == JobStatusChangedEvent.class) {
- processJobStatusChangedEvent(entityPS, workflowUpdateNumCompletedPS,
+ processJobStatusChangedEvent(entityPS,
originalEvent, (JobStatusChangedEvent)parsedEvent);
} else if (eventClass == JobInfoChangeEvent.class) {
processJobInfoChangeEvent(entityPS,
@@ -636,13 +649,14 @@ public class MapReduceJobHistoryUpdater
workflowPS.setLong(5, historyEvent.getSubmitTime());
workflowPS.setLong(6, historyEvent.getSubmitTime());
workflowPS.setLong(7, workflowContext.getWorkflowDag().size());
- workflowPS.setLong(8, 0);
workflowPS.executeUpdate();
LOG.debug("Successfully inserted workflowId = " +
workflowContext.getWorkflowId());
} else {
workflowUpdateTimePS.setLong(1, historyEvent.getSubmitTime());
- workflowUpdateTimePS.setString(2, workflowContext.getWorkflowId());
+ workflowUpdateTimePS.setLong(2, historyEvent.getSubmitTime());
+ workflowUpdateTimePS.setString(3, workflowContext.getWorkflowId());
+ workflowUpdateTimePS.setString(4, workflowContext.getWorkflowId());
workflowUpdateTimePS.executeUpdate();
LOG.debug("Successfully updated workflowId = " +
workflowContext.getWorkflowId());
@@ -666,6 +680,7 @@ public class MapReduceJobHistoryUpdater
private void processJobFinishedEvent(
PreparedStatement entityPS,
+ PreparedStatement workflowUpdateNumCompletedPS,
LoggingEvent logEvent, JobFinishedEvent historyEvent) {
Counters counters = historyEvent.getMapCounters();
long inputBytes = 0;
@@ -698,6 +713,11 @@ public class MapReduceJobHistoryUpdater
entityPS.setLong(7, outputBytes);
entityPS.setString(8, historyEvent.getJobid().toString());
entityPS.executeUpdate();
+ // job finished events always have success status
+ workflowUpdateNumCompletedPS.setString(1,
historyEvent.getJobid().toString());
+ workflowUpdateNumCompletedPS.setLong(2, historyEvent.getFinishTime());
+ workflowUpdateNumCompletedPS.setLong(3, historyEvent.getFinishTime());
+ workflowUpdateNumCompletedPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job "
+
historyEvent.getJobid() + " into " + JOB_TABLE, sqle);
@@ -725,18 +745,11 @@ public class MapReduceJobHistoryUpdater
private void processJobStatusChangedEvent(
PreparedStatement entityPS,
- PreparedStatement workflowUpdateNumCompletedPS,
LoggingEvent logEvent, JobStatusChangedEvent historyEvent) {
try {
entityPS.setString(1, historyEvent.getStatus());
entityPS.setString(2, historyEvent.getJobId().toString());
entityPS.executeUpdate();
- if ("SUCCESS".equals(historyEvent.getStatus())) {
- workflowUpdateNumCompletedPS.setLong(1, System.currentTimeMillis());
- workflowUpdateNumCompletedPS.setString(2,
- historyEvent.getJobId().toString());
- workflowUpdateNumCompletedPS.executeUpdate();
- }
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job "
+
historyEvent.getJobId() + " into " + JOB_TABLE, sqle);