Author: cdouglas
Date: Fri Jan 16 19:16:49 2009
New Revision: 735218

URL: http://svn.apache.org/viewvc?rev=735218&view=rev
Log:
HADOOP-4843. Collect job history and configuration in Chukwa. Contributed by 
Eric Yang

Added:
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/mapred/
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/chukwa/build.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=735218&r1=735217&r2=735218&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jan 16 19:16:49 2009
@@ -358,6 +358,9 @@
     BlockCompressorStream, and BlockDecompressorStream public to facilitate 
     non-Hadoop codecs. (omalley)
 
+    HADOOP-4843. Collect job history and configuration in Chukwa. (Eric Yang
+    via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/contrib/chukwa/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/build.xml?rev=735218&r1=735217&r2=735218&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/build.xml (original)
+++ hadoop/core/trunk/src/contrib/chukwa/build.xml Fri Jan 16 19:16:49 2009
@@ -276,7 +276,7 @@
                <mkdir dir="${build.dir}"/>
                <mkdir dir="${build.classes}"/>
                <mkdir dir="${build.dir}/test"/>
-               <javac srcdir="src/java/org/apache/hadoop/chukwa" 
destdir="${build.classes}" excludes="**/ChukwaTTInstru.java" 
debug="${javac.debug}">
+               <javac srcdir="src/java/org/apache/hadoop" 
destdir="${build.classes}" excludes="**/ChukwaTTInstru.java" 
debug="${javac.debug}">
                        <classpath refid="classpath" />
                </javac>
        </target>
@@ -446,6 +446,7 @@
 
                <jar 
jarfile="${build.dir}/chukwa-hadoop-${chukwaVersion}-client.jar" 
basedir="${build.classes}" 
includes="org/apache/hadoop/chukwa/inputtools/log4j/**/*.class">
                        <fileset dir="${basedir}/src/java">
+                               <include 
name="org/apache/hadoop/mapred/**/*.java"/>
                                <include 
name="org/apache/hadoop/chukwa/inputtools/log4j/**/*.java"/>
                                <include 
name="org/apache/hadoop/chukwa/datacollection/client/**/*.java"/>
                                <include 
name="org/apache/hadoop/chukwa/util/**/*.java"/>
@@ -454,6 +455,7 @@
                                <include 
name="chukwa-hadoop-metrics-log4j.properties"/>
                        </fileset>
                        <fileset dir="${build.classes}">
+                               <include 
name="org/apache/hadoop/mapred/**/*.class"/>
                                <include 
name="org/apache/hadoop/chukwa/datacollection/client/**/*.class"/>
                                <include 
name="org/apache/hadoop/chukwa/util/**/*.class"/>
                                <include 
name="org/apache/hadoop/chukwa/datacollection/controller/*.class"/>

Added: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java?rev=735218&view=auto
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java
 (added)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java
 Fri Jan 16 19:16:49 2009
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import 
org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.fs.Path;
+
+public class ChukwaJobTrackerInstrumentation extends 
org.apache.hadoop.mapred.JobTrackerInstrumentation {
+
+         protected final JobTracker tracker;
+         private static ChukwaAgentController chukwaClient = null;
+         private static Log log = 
LogFactory.getLog(JobTrackerInstrumentation.class);
+         private static HashMap<JobID, Long> jobConfs = null;
+         private static HashMap<JobID, Long> jobHistories = null;
+
+         public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+          super(jt,conf);
+             tracker = jt;
+             if(chukwaClient==null) {
+                     chukwaClient = new ChukwaAgentController();
+             }
+             if(jobConfs==null) {
+                 jobConfs = new HashMap<JobID, Long>();
+             }
+             if(jobHistories==null) {
+                 jobHistories = new HashMap<JobID, Long>();
+             }
+         }
+
+         public void launchMap(TaskAttemptID taskAttemptID) {
+                 
+         }
+
+         public void completeMap(TaskAttemptID taskAttemptID) {
+                 
+         }
+
+         public void launchReduce(TaskAttemptID taskAttemptID) {
+                 
+         }
+
+         public void completeReduce(TaskAttemptID taskAttemptID) {
+                 
+         }
+
+         public void submitJob(JobConf conf, JobID id) {
+          String chukwaJobConf = tracker.getLocalJobFilePath(id);
+          try {
+              String jobFileName = 
JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+              Path jobHistoryPath = 
JobHistory.JobInfo.getJobHistoryLogLocation(jobFileName);
+              String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
+              long adaptorID = 
chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
 "JobConf", "0 "+jobConfPath, 0);
+              jobConfs.put(id, adaptorID);
+              if(jobHistoryPath.toString().matches("^hdfs://")) {
+                  adaptorID = 
chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor", 
"JobHistory", "0 "+jobHistoryPath.toString(), 0);
+              } else {
+                  adaptorID = 
chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
 "JobHistory", "0 "+jobHistoryPath.toString().substring(5), 0);                 
    
+              }
+              jobHistories.put(id, adaptorID);
+          } catch(Exception ex) {
+                 
+          }
+      }
+
+         public void completeJob(JobConf conf, JobID id) {
+          try {
+             if (jobHistories.containsKey(id)) {
+                 chukwaClient.remove(jobHistories.get(id));
+             }
+             if (jobConfs.containsKey(id)) {
+                 chukwaClient.remove(jobConfs.get(id));
+             }
+          } catch(Throwable e) {
+            log.warn("could not remove adaptor for this job: " + 
id.toString(),e);
+            e.printStackTrace();
+          }
+         }
+
+}


Reply via email to