Updated Branches: refs/heads/trunk 8f5b66286 -> 44cc11dfb
AMBARI-2759. ambari-log4j doesn't work with MySQL. (Chen Chun via billie) Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/44cc11df Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/44cc11df Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/44cc11df Branch: refs/heads/trunk Commit: 44cc11dfb38b5dd3ce770fcd3a632c26ae37a596 Parents: 8f5b662 Author: Billie Rinaldi <[email protected]> Authored: Fri Aug 30 13:34:01 2013 -0700 Committer: Billie Rinaldi <[email protected]> Committed: Fri Aug 30 13:34:01 2013 -0700 ---------------------------------------------------------------------- .../ambari/eventdb/db/PostgresConnector.java | 5 ++- .../ambari/eventdb/model/TaskAttempt.java | 7 ++++ .../jobhistory/MapReduceJobHistoryUpdater.java | 40 ++++++++++++-------- 3 files changed, 34 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java b/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java index 6b0d36c..e96fe92 100644 --- a/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java +++ b/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java @@ -81,8 +81,9 @@ public class PostgresConnector implements DBConnector { FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"), FJTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? ORDER BY " + TaskAttemptFields.STARTTIME), - FWTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", (SELECT " + JobFields.JOBID + " as id FROM " + JOB_TABLE_NAME - + " WHERE " + JobFields.WORKFLOWID + " = ?) AS jobs WHERE " + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = jobs.id " + FWTA_PS("SELECT " + TaskAttemptFields.join(TASK_ATTEMPT_TABLE_NAME) + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", " + JOB_TABLE_NAME + " WHERE " + + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = " + JOB_TABLE_NAME + "." + JobFields.JOBID + " AND " + JOB_TABLE_NAME + "." + + JobFields.WORKFLOWID + " = ?" + " ORDER BY " + TaskAttemptFields.JOBID + "," + TaskAttemptFields.STARTTIME + ", " + TaskAttemptFields.FINISHTIME), FTA_TIMERANGE_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.FINISHTIME + " >= ? AND " + TaskAttemptFields.STARTTIME + " <= ? AND (" + TaskAttemptFields.TASKTYPE + " = 'MAP' OR " + TaskAttemptFields.TASKTYPE + " = 'REDUCE') ORDER BY " http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java b/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java index 0b0914c..526d63d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java +++ b/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java @@ -57,6 +57,13 @@ public class TaskAttempt { tmp[i] = TaskAttemptFields.values()[i].toString(); return StringUtils.join(tmp, ","); } + + public static String join(String tableName) { + String[] tmp = new String[TaskAttemptFields.values().length]; + for (int i = 0; i < tmp.length; i++) + tmp[i] = tableName + "." + TaskAttemptFields.values()[i].toString(); + return StringUtils.join(tmp, ","); + } } public static final String TASK_ATTEMPT_FIELDS = TaskAttemptFields.join(); http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java ---------------------------------------------------------------------- diff --git a/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java b/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java index aec5415..d62494b 100644 --- a/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java +++ b/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java @@ -162,9 +162,7 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider { "workflowContext = ?, " + "numJobsTotal = ?, " + "lastUpdateTime = ?, " + - "duration = ? - (SELECT startTime FROM " + - WORKFLOW_TABLE + - " WHERE workflowId = ?) " + + "duration = ? - startTime " + "WHERE workflowId = ?" ); @@ -174,20 +172,31 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider { WORKFLOW_TABLE + " SET " + "lastUpdateTime = ?, " + - "duration = ? - (SELECT startTime FROM " + - WORKFLOW_TABLE + - " WHERE workflowId = selectid), " + - "numJobsCompleted = rows, " + - "inputBytes = input, " + - "outputBytes = output " + - "FROM (SELECT count(*) as rows, sum(inputBytes) as input, " + - "sum(outputBytes) as output, workflowId as selectid FROM " + - JOB_TABLE + + "duration = ? - startTime, " + + "numJobsCompleted = (" + + "SELECT count(*)" + + " FROM " + + JOB_TABLE + + " WHERE " + + "workflowId = " + WORKFLOW_TABLE + ".workflowId" + + " AND status = 'SUCCESS'), " + + "inputBytes = (" + + "SELECT sum(inputBytes)" + + " FROM " + + JOB_TABLE + + " WHERE " + + "workflowId = " + WORKFLOW_TABLE + ".workflowId" + + " AND status = 'SUCCESS'), " + + "outputBytes = (" + + "SELECT sum(outputBytes)" + + " FROM " + + JOB_TABLE + + " WHERE " + + "workflowId = " + WORKFLOW_TABLE + ".workflowId" + + " AND status = 'SUCCESS') " + " WHERE workflowId = (SELECT workflowId FROM " + JOB_TABLE + - " WHERE jobId = ?) AND status = 'SUCCESS' " + - "GROUP BY workflowId) as jobsummary " + - "WHERE workflowId = selectid" + " WHERE jobId = ?)" ); // JobFinishedEvent @@ -726,7 +735,6 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider { workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime()); workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime()); workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId()); - workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId()); workflowUpdateTimePS.executeUpdate(); LOG.debug("Successfully updated workflowId = " + workflowContext.getWorkflowId());
