Author: cutting
Date: Fri Apr 28 11:33:53 2006
New Revision: 397978

URL: http://svn.apache.org/viewcvs?rev=397978&view=rev
Log:
Fix HADOOP-167.  Reduce the number of Configuration and JobConf's allocated.  
Contributed by Owen.

Modified:
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Grep.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Grep.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Grep.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Grep.java 
(original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Grep.java Fri 
Apr 28 11:33:53 2006
@@ -27,8 +27,7 @@
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.hadoop.conf.Configuration;
-
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Random;
@@ -43,13 +42,11 @@
       System.exit(-1);
     }
 
-    Configuration defaults = new Configuration();
-
     Path tempDir =
       new Path("grep-temp-"+
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
-    JobConf grepJob = new JobConf(defaults, Grep.class);
+    JobConf grepJob = new JobConf(Grep.class);
     grepJob.setJobName("grep-search");
 
     grepJob.setInputPath(new Path(args[0]));
@@ -69,7 +66,7 @@
 
     JobClient.runJob(grepJob);
 
-    JobConf sortJob = new JobConf(defaults, Grep.class);
+    JobConf sortJob = new JobConf(Grep.class);
     sortJob.setJobName("grep-sort");
 
     sortJob.setInputPath(tempDir);
@@ -86,7 +83,7 @@
 
     JobClient.runJob(sortJob);
 
-    new JobClient(defaults).getFs().delete(tempDir);
+    FileSystem.get(grepJob).delete(tempDir);
   }
 
 }

Modified: 
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java 
(original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java 
Fri Apr 28 11:33:53 2006
@@ -96,9 +96,7 @@
    *                     job tracker.
    */
   public static void main(String[] args) throws IOException {
-    Configuration defaults = new Configuration();
-    
-    JobConf conf = new JobConf(defaults, WordCount.class);
+    JobConf conf = new JobConf(WordCount.class);
     conf.setJobName("wordcount");
  
     // the keys are words (strings)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Fri 
Apr 28 11:33:53 2006
@@ -19,7 +19,7 @@
 import java.util.*;
 import java.net.URL;
 import java.io.*;
-import java.util.logging.Logger;
+import java.util.logging.*;
 
 import javax.xml.parsers.*;
 
@@ -29,7 +29,7 @@
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
 
-import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -62,12 +62,19 @@
 
   /** A new configuration. */
   public Configuration() {
+    if (LOG.isLoggable(Level.FINE)) {
+      LOG.fine(StringUtils.stringifyException(new IOException("config()")));
+    }
     defaultResources.add("hadoop-default.xml");
     finalResources.add("hadoop-site.xml");
   }
 
   /** A new configuration with the same settings cloned from another. */
   public Configuration(Configuration other) {
+    if (LOG.isLoggable(Level.FINE)) {
+      LOG.fine(StringUtils.stringifyException
+                 (new IOException("config(config)")));
+    }
     this.defaultResources = (ArrayList)other.defaultResources.clone();
     this.finalResources = (ArrayList)other.finalResources.clone();
     if (other.properties != null)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Fri Apr 28 
11:33:53 2006
@@ -237,7 +237,10 @@
           implementation.getMethod(call.getMethodName(),
                                    call.getParameterClasses());
 
+        long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.fine("Call: " + call.getMethodName() + " " + callTime);
         if (verbose) log("Return: "+value);
 
         return new ObjectWritable(method.getReturnType(), value);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Apr 
28 11:33:53 2006
@@ -29,7 +29,6 @@
 import java.net.URLDecoder;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 
@@ -49,10 +48,34 @@
  * of input files, and where the output files should be written. */
 public class JobConf extends Configuration {
 
+  private void initialize() {
+    addDefaultResource("mapred-default.xml");
+  }
+  
+  private void initialize(Class exampleClass) {
+    initialize();
+    String jar = findContainingJar(exampleClass);
+    if (jar != null) {
+      setJar(jar);
+    }   
+  }
+  
+  /**
+   * Construct a map/reduce job configuration.
+   */
   public JobConf() {
-    super();
+    initialize();
   }
-    
+
+  /** 
+   * Construct a map/reduce job configuration.
+   * @param conf a Configuration whose settings will be inherited.
+   * @param exampleClass a class whose containing jar is used as the job's jar.
+   */
+  public JobConf(Class exampleClass) {
+    initialize(exampleClass);
+  }
+  
   /**
    * Construct a map/reduce job configuration.
    * 
@@ -61,21 +84,18 @@
    */
   public JobConf(Configuration conf) {
     super(conf);
-    addDefaultResource("mapred-default.xml");
+    initialize();
   }
 
 
   /** Construct a map/reduce job configuration.
    * 
    * @param conf a Configuration whose settings will be inherited.
-   * @param aClass a class whose containing jar is used as the job's jar.
+   * @param exampleClass a class whose containing jar is used as the job's jar.
    */
-  public JobConf(Configuration conf, Class aClass) {
+  public JobConf(Configuration conf, Class exampleClass) {
     this(conf);
-    String jar = findContainingJar(aClass);
-    if (jar != null) {
-      setJar(jar);
-    }
+    initialize(exampleClass);
   }
 
 

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java 
Fri Apr 28 11:33:53 2006
@@ -172,7 +172,11 @@
   }
 
   public void setConf(Configuration conf) {
-    this.jobConf = new JobConf(conf);
+    if (conf instanceof JobConf) {
+      jobConf = (JobConf) conf;
+    } else {
+      this.jobConf = new JobConf(conf);
+    }
   }
 
   public Configuration getConf() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Apr 
28 11:33:53 2006
@@ -34,8 +34,8 @@
   }
 
   private FileSplit split;
-  private MapOutputFile mapOutputFile;
-  private Configuration conf;
+  private MapOutputFile mapOutputFile = new MapOutputFile();
+  private JobConf conf;
 
   public MapTask() {}
 
@@ -149,9 +149,12 @@
   }
 
   public void setConf(Configuration conf) {
-    this.conf = conf;
-    this.mapOutputFile = new MapOutputFile();
-    this.mapOutputFile.setConf(conf);
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+    this.mapOutputFile.setConf(this.conf);
   }
 
   public Configuration getConf() {

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java 
Fri Apr 28 11:33:53 2006
@@ -23,7 +23,7 @@
 class MapTaskRunner extends TaskRunner {
   private MapOutputFile mapOutputFile;
 
-  public MapTaskRunner(Task task, TaskTracker tracker, Configuration conf) {
+  public MapTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
     super(task, tracker, conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri 
Apr 28 11:33:53 2006
@@ -46,8 +46,8 @@
   private Progress appendPhase = getProgress().addPhase("append");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
-  private Configuration conf;
-  private MapOutputFile mapOutputFile;
+  private JobConf conf;
+  private MapOutputFile mapOutputFile = new MapOutputFile();
 
   public ReduceTask() {}
 
@@ -307,9 +307,12 @@
   }
 
   public void setConf(Configuration conf) {
-    this.conf = conf;
-    this.mapOutputFile = new MapOutputFile();
-    this.mapOutputFile.setConf(conf);
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+    this.mapOutputFile.setConf(this.conf);
   }
 
   public Configuration getConf() {

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java 
Fri Apr 28 11:33:53 2006
@@ -31,7 +31,7 @@
     LogFormatter.getLogger("org.apache.hadoop.mapred.ReduceTaskRunner");
   private MapOutputFile mapOutputFile;
 
-  public ReduceTaskRunner(Task task, TaskTracker tracker, Configuration conf) {
+  public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
     super(task, tracker, conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri 
Apr 28 11:33:53 2006
@@ -38,9 +38,9 @@
   private Task t;
   private TaskTracker tracker;
 
-  protected Configuration conf;
+  protected JobConf conf;
 
-  public TaskRunner(Task t, TaskTracker tracker, Configuration conf) {
+  public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
     this.t = t;
     this.tracker = tracker;
     this.conf = conf;
@@ -76,8 +76,7 @@
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
 
-      JobConf job = new JobConf(t.getJobFile());
-      String jar = job.getJar();
+      String jar = conf.getJar();
       if (jar != null) {                      // if jar exists, it into workDir
         unJar(new File(jar), workDir);
         File[] libs = new File(workDir, "lib").listFiles();
@@ -124,10 +123,10 @@
       //     </value>
       //
       String javaOpts = handleDeprecatedHeapSize(
-          job.get("mapred.child.java.opts", "-Xmx200m"),
-          job.get("mapred.child.heap.size"));
+          conf.get("mapred.child.java.opts", "-Xmx200m"),
+          conf.get("mapred.child.heap.size"));
       javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
-      int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1;
+      int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;
       javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
       String [] javaOptsSplit = javaOpts.split(" ");
       for (int i = 0; i < javaOptsSplit.length; i++) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=397978&r1=397977&r2=397978&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri 
Apr 28 11:33:53 2006
@@ -65,7 +65,7 @@
     FileSystem fs = null;
     static final String SUBDIR = "taskTracker";
 
-    private Configuration fConf;
+    private JobConf fConf;
     private MapOutputFile mapOutputFile;
 
     private int maxCurrentTasks;
@@ -83,14 +83,14 @@
     /**
      * Start with the local machine name, and the default JobTracker
      */
-    public TaskTracker(Configuration conf) throws IOException {
+    public TaskTracker(JobConf conf) throws IOException {
       this(JobTracker.getAddress(conf), conf);
     }
 
     /**
      * Start with the local machine name, and the addr of the target JobTracker
      */
-    public TaskTracker(InetSocketAddress jobTrackAddr, Configuration conf) 
throws IOException {
+    public TaskTracker(InetSocketAddress jobTrackAddr, JobConf conf) throws 
IOException {
         maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
 
         this.fConf = conf;
@@ -112,7 +112,7 @@
                                (Math.abs(r.nextInt()) % 100000);
         LOG.info("Starting tracker " + taskTrackerName);
 
-        new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
+        fConf.deleteLocalFiles(SUBDIR);
 
         // Clear out state tables
         this.tasks = new TreeMap();
@@ -409,17 +409,19 @@
         TaskRunner runner;
         boolean done = false;
         boolean wasKilled = false;
-        private JobConf jobConf;
+        private JobConf defaultJobConf;
+        private JobConf localJobConf;
 
         /**
          */
-        public TaskInProgress(Task task, Configuration conf) {
+        public TaskInProgress(Task task, JobConf conf) {
             this.task = task;
             this.progress = 0.0f;
             this.runstate = TaskStatus.UNASSIGNED;
             stateString = "initializing";
             this.lastProgressReport = System.currentTimeMillis();
-            this.jobConf = new JobConf(conf);
+            this.defaultJobConf = conf;
+            localJobConf = null;
         }
 
         /**
@@ -427,31 +429,35 @@
          * So here, edit the Task's fields appropriately.
          */
         private void localizeTask(Task t) throws IOException {
-            this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + 
+                                                 task.getTaskId());
             Path localJobFile =
-              
this.jobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
+              
this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
             Path localJarFile =
-              
this.jobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
+              
this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
 
             String jobFile = t.getJobFile();
             fs.copyToLocalFile(new Path(jobFile), localJobFile);
             t.setJobFile(localJobFile.toString());
 
-            JobConf jc = new JobConf(localJobFile);
-            jc.set("mapred.task.id", task.getTaskId());
-            String jarFile = jc.getJar();
+            localJobConf = new JobConf(localJobFile);
+            localJobConf.set("mapred.task.id", task.getTaskId());
+            String jarFile = localJobConf.getJar();
             if (jarFile != null) {
               fs.copyToLocalFile(new Path(jarFile), localJarFile);
-              jc.setJar(localJarFile.toString());
+              localJobConf.setJar(localJarFile.toString());
 
               FileSystem localFs = FileSystem.getNamed("local", fConf);
               OutputStream out = localFs.create(localJobFile);
               try {
-                jc.write(out);
+                localJobConf.write(out);
               } finally {
                 out.close();
               }
             }
+            // set the task's configuration to the local job conf
+            // rather than the default.
+            t.setConf(localJobConf);
         }
 
         /**
@@ -618,7 +624,7 @@
                 runner.close();
             } catch (IOException ie) {
             }
-            this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + 
task.getTaskId());
         }
     }
 
@@ -735,17 +741,18 @@
           LogFormatter.showTime(false);
           LOG.info("Child starting");
 
-          Configuration conf = new Configuration();
+          JobConf defaultConf = new JobConf();
           int port = Integer.parseInt(args[0]);
           String taskid = args[1];
           TaskUmbilicalProtocol umbilical =
             (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-                                                new InetSocketAddress(port), 
conf);
+                                                new InetSocketAddress(port), 
+                                                defaultConf);
             
           Task task = umbilical.getTask(taskid);
           JobConf job = new JobConf(task.getJobFile());
 
-          conf.addFinalResource(new Path(task.getJobFile()));
+          defaultConf.addFinalResource(new Path(task.getJobFile()));
 
           startPinging(umbilical, taskid);        // start pinging parent
 
@@ -809,7 +816,7 @@
             System.exit(-1);
         }
 
-        TaskTracker tt = new TaskTracker(new Configuration());
+        TaskTracker tt = new TaskTracker(new JobConf());
         tt.run();
     }
 }


Reply via email to