Hi All,

Just popping my head up over the lurkers parapet for a second to let you know about some development work I've been doing regarding getting Hadoop to run in an OSGi environment.

Sorry if this is the wrong forum for this post. I did consider sending this to the hadoop-dev list but on inspection of the traffic there it seemed to be mostly Jira issue tracking of which this is not yet a part....

In order to get Hadoop to work in an OSGi environment I had to make a couple of changes to the Hadoop core, mostly with regard to better handling of classloading in the TaskTracker$Child process.

Attached is a patch file of the changes I made which is compiled against the 0.14.0 svn tag - comments are included at each diff point to explain what I'm doing. Obviously really happy for these changes to be added to Hadoop or for you to suggest any other changes or improvements.

Short background with some product placement - please skip if not in slightest bit interested...

The background to this work was to get Hadoop running against our product Infiniflow of which there is an opensource component - Newton (http://newton.codecauldron.org).

Newton is a distributed runtime based on OSGi which handles runtime deployment and failover of SCA composites over a compute fabric (our marketing term for bunch of hetrogenous servers).

Final blurb...

If anyone is able to make it I should hopefully be presenting some of this work at the NYJavaSIG on 17th October at a session entitled "OSGi In The Enterprise".

Regards,

Dave Savage
Software Engineer
Paremus Ltd
http://www.paremus.com
http://www.codecauldron.org


_______________________________________________________________________
Paremus Limited. Registered in England
No. 4181472
Registered Office: St Alphage House, 2 Fore Street, London, EC2Y 5DH Postal 
Address: 107-111 Fleet Street, London, EC4A 2AB
The information transmitted is intended only for the person(s) or entity to 
which it is addressed and may contain confidential and/or privileged material. 
Any review, retransmission, dissemination or other use of, or taking of any 
action in reliance upon, this information by persons or entities other than the 
intended recipient is prohibited.
If you received this in error, please contact the sender and delete the 
material from any computer.
_______________________________________________________________________
Index: src/java/org/apache/hadoop/mapred/TaskLog.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskLog.java      (revision 570771)
+++ src/java/org/apache/hadoop/mapred/TaskLog.java      (working copy)
@@ -198,8 +198,12 @@
                                                 File stderrFilename,
                                                 long tailLength
                                                ) throws IOException {
-    String stdout = FileUtil.makeShellPath(stdoutFilename);
-    String stderr = FileUtil.makeShellPath(stderrFilename);
+    /* 
+     PATCH NOTE
+        Use absolute filename to get around problem where parent process not 
in same dir as child
+     */
+    String stdout = FileUtil.makeShellPath(stdoutFilename.getAbsoluteFile());
+    String stderr = FileUtil.makeShellPath(stderrFilename.getAbsoluteFile());
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
Index: src/java/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/java/org/apache/hadoop/mapred/Task.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/Task.java (working copy)
@@ -40,6 +40,16 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+/* 
+ PATCH NOTE
+ Imports to support patch below
+*/
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import org.apache.hadoop.filecache.DistributedCache;
+
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
   private static final Log LOG =
@@ -205,6 +215,86 @@
     }
   }
   
+  /* 
+    PATCH NOTE
+    This logic is basically a cut & paste of work done in 
org.apache.hadoop.mapred.TaskRunner
+       The reason for doing this here is to get around the issue where the 
system classpath
+    does not contain the Hadoop api classes (i.e. Reducer, Mapper etc) instead 
these classes
+    are found in osgi classpath loaded from parent classloader.
+  */
+  public void configureClasspath(JobConf conf)
+  throws IOException {
+
+         // get the task and the current classloader which will become the 
parent
+         ClassLoader parent = conf.getClassLoader();   
+
+         // get the work directory which holds the elements we are dynamically
+         // adding to the classpath
+         File workDir = new File(getJobFile()).getParentFile();
+         File jobCacheDir = new File(workDir.getParent(), "work");
+         ArrayList<URL> urllist = new ArrayList<URL>();
+
+         // add the jars and directories to the classpath
+         String jar = conf.getJar();
+         if (jar != null) {      
+                 File[] libs = new File(jobCacheDir, "lib").listFiles();
+                 if (libs != null) {
+                         for (int i = 0; i < libs.length; i++) {
+                                 urllist.add(libs[i].toURL());
+                         }
+                 }
+                 urllist.add(new File(jobCacheDir, "classes").toURL());
+                 urllist.add(jobCacheDir.toURL());
+
+         }
+         
+      URI[] archives = DistributedCache.getCacheArchives(conf);
+      URI[] files = DistributedCache.getCacheFiles(conf);
+      // include the user specified classpath
+               
+      //archive paths
+      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+      if (archiveClasspaths != null && archives != null) {
+        Path[] localArchives = DistributedCache
+          .getLocalCacheArchives(conf);
+        if (localArchives != null){
+          for (int i=0;i<archives.length;i++){
+            for(int j=0;j<archiveClasspaths.length;j++){
+              if (archives[i].getPath().equals(
+                                               
archiveClasspaths[j].toString())){
+                urllist.add(localArchives[i].toUri().toURL());
+              }
+            }
+          }
+        }
+      }
+      //file paths
+      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+      if (fileClasspaths!=null && files != null) {
+        Path[] localFiles = DistributedCache
+          .getLocalCacheFiles(conf);
+        if (localFiles != null) {
+          for (int i = 0; i < files.length; i++) {
+            for (int j = 0; j < fileClasspaths.length; j++) {
+              if (files[i].getPath().equals(
+                                            fileClasspaths[j].toString())) {
+                  urllist.add(localFiles[i].toUri().toURL());
+              }
+            }
+          }
+        }
+      }
+         
+         urllist.add(workDir.toURL());
+
+         // create a new classloader with the old classloader as its parent
+         // then set that classloader as the one used by the current jobconf
+         URL[] urls = urllist.toArray(new URL[urllist.size()]);
+         URLClassLoader loader = new URLClassLoader(urls, parent);
+         conf.setClassLoader(loader);
+  }
+
+  
   /** Run this task as a part of the named job.  This method is executed in the
    * child process and is what invokes user-supplied map, reduce, etc. methods.
    * @param umbilical for progress reports
Index: src/java/org/apache/hadoop/mapred/TaskRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskRunner.java   (revision 570771)
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java   (working copy)
@@ -24,6 +24,7 @@
 import org.apache.hadoop.util.*;
 import java.io.*;
 import java.util.List;
+import java.util.StringTokenizer;
 import java.util.Vector;
 import java.net.URI;
 
@@ -144,69 +145,32 @@
       String sep = System.getProperty("path.separator");
       StringBuffer classPath = new StringBuffer();
       // start with same classpath as parent process
-      classPath.append(System.getProperty("java.class.path"));
+      /* 
+       PATCH NOTE
+          qualify path to get around problem where parent process not in same 
dir as child
+       */
+      classPath.append(qualifyPath(System.getProperty("java.class.path")));
       classPath.append(sep);
       if (!workDir.mkdirs()) {
         if (!workDir.isDirectory()) {
           LOG.fatal("Mkdirs failed to create " + workDir.toString());
         }
       }
-         
-      String jar = conf.getJar();
-      if (jar != null) {       
-        // if jar exists, it into workDir
-        File[] libs = new File(jobCacheDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            classPath.append(sep);            // add libs from jar to classpath
-            classPath.append(libs[i]);
-          }
-        }
-        classPath.append(sep);
-        classPath.append(new File(jobCacheDir, "classes"));
-        classPath.append(sep);
-        classPath.append(jobCacheDir);
-       
+      /* 
+       PATCH NOTE
+          Allow extra jars on classpath not necessarily those found on system 
classpath
+       (in osgi classpath is dynamically built by installed bundles vs 
statically configured
+       at system boot)
+       */      
+      String jobPath = conf.get( "mapred.child.java.classpath" ); 
+      if ( jobPath != null ) {
+         classPath.append( jobPath );
+         classPath.append( sep );
       }
-
-      // include the user specified classpath
-               
-      //archive paths
-      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-      if (archiveClasspaths != null && archives != null) {
-        Path[] localArchives = DistributedCache
-          .getLocalCacheArchives(conf);
-        if (localArchives != null){
-          for (int i=0;i<archives.length;i++){
-            for(int j=0;j<archiveClasspaths.length;j++){
-              if (archives[i].getPath().equals(
-                                               
archiveClasspaths[j].toString())){
-                classPath.append(sep);
-                classPath.append(localArchives[i]
-                                 .toString());
-              }
-            }
-          }
-        }
-      }
-      //file paths
-      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-      if (fileClasspaths!=null && files != null) {
-        Path[] localFiles = DistributedCache
-          .getLocalCacheFiles(conf);
-        if (localFiles != null) {
-          for (int i = 0; i < files.length; i++) {
-            for (int j = 0; j < fileClasspaths.length; j++) {
-              if (files[i].getPath().equals(
-                                            fileClasspaths[j].toString())) {
-                classPath.append(sep);
-                classPath.append(localFiles[i].toString());
-              }
-            }
-          }
-        }
-      }
-
+         /* 
+       PATCH NOTE
+          Logic moved to Task.configureClasspath - see patch comment there
+       */
       classPath.append(sep);
       classPath.append(workDir);
       //  Build exec child jmv args.
@@ -275,7 +239,16 @@
         vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
         vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
 
-        // Add main class and its arguments 
+        // Add main class and its arguments
+        /* 
+         PATCH NOTE
+            Allow client to specify a launcher that launches your main class
+         This allows me to substitute an osgi enabled jvm
+         */
+        String launcher = conf.get( "mapred.child.launcher" );
+        if ( launcher != null ) {
+               vargs.add( launcher );
+        }
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
         // pass umbilical port
         vargs.add(Integer.toString(tracker.getTaskTrackerReportPort())); 
@@ -328,7 +301,41 @@
   }
 
   
+ /* 
+  PATCH NOTE
+  Utility method to qualify system classpath - could be more efficient, 
(StringTokenizer bleugh)
+  */
   /**
+ * @param property
+ * @return
+ */
+ private String qualifyPath( String property ) {
+       StringBuffer buf = new StringBuffer(property.length() * 10);
+       String sep = System.getProperty("path.separator");
+       
+       StringTokenizer tok = new StringTokenizer(property, sep);
+       
+       while( tok.hasMoreTokens() ) {
+               String element = tok.nextToken();
+       
+               if ( element.trim().length() > 0 ) {
+                       File f = new File( element );
+                       
+                       if ( f.exists() ) {
+                               buf.append( f.getAbsolutePath() );
+                       }
+                       else {
+                               throw new IllegalStateException( "Unknown path 
element " + tok );
+                       }
+                       
+                       buf.append( sep );
+               }
+       }
+       
+       return buf.toString();
+}
+
+/**
    * Handle deprecated mapred.child.heap.size.
    * If present, interpolate into mapred.child.java.opts value with
    * warning.
@@ -399,6 +406,11 @@
       int exit_code = process.waitFor();
      
       if (!killed && exit_code != 0) {
+          /* 
+           PATCH NOTE
+           Debug to help diagnose problems in launching process
+           */
+          LOG.warn( "Failed to execute : " + argsToString( args ) );
         throw new IOException("Task process exit with nonzero status of " +
                               exit_code + ".");
       }
@@ -408,6 +420,24 @@
       kill();      
     }
   }
+  
+  /* 
+   PATCH NOTE
+   Utility method used in logging
+  */
+  private String argsToString( List<String> args ) {
+      StringBuffer buf = new StringBuffer();
+      
+      for ( String arg : args ) {
+          if ( buf.length() != 0 ) {
+              buf.append( ' ' );
+          }
+          
+          buf.append( arg );
+      }
+      
+      return buf.toString();
+  }
 
   /**
    * Kill the child process
Index: src/java/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskTracker.java  (revision 570771)
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java  (working copy)
@@ -21,6 +21,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,6 +29,8 @@
 import java.io.PrintStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -641,7 +644,20 @@
         rjob.localized = true;
       }
     }
-    launchTaskForJob(tip, new JobConf(rjob.jobFile)); 
+    
+    /* 
+     PATCH NOTE
+        Allow parent proccess to copy config to child job to enable
+     specification of child launcher and classpath in TaskRunner
+     */
+    JobConf conf = new JobConf(rjob.jobFile);
+    for ( Map.Entry<String, String> entry : fConf ) {
+        if ( entry.getKey().startsWith( "mapred.child" ) ) {
+            conf.set( entry.getKey(), entry.getValue() );
+        }
+    }
+    
+    launchTaskForJob(tip, conf); 
   }
     
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws 
IOException{
Index: src/java/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/java/org/apache/hadoop/mapred/ReduceTask.java   (revision 570771)
+++ src/java/org/apache/hadoop/mapred/ReduceTask.java   (working copy)
@@ -237,6 +237,11 @@
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
+       /* 
+      PATCH NOTE
+      See comment in org.apache.hadoop.mapred.Task 
+    */
+       configureClasspath(job);
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                                            
job.getReducerClass(), job);
 
@@ -735,46 +740,14 @@
       }
       
     }
-    
-    private void configureClasspath(JobConf conf)
-      throws IOException {
-      
-      // get the task and the current classloader which will become the parent
-      Task task = ReduceTask.this;
-      ClassLoader parent = conf.getClassLoader();   
-      
-      // get the work directory which holds the elements we are dynamically
-      // adding to the classpath
-      File workDir = new File(task.getJobFile()).getParentFile();
-      File jobCacheDir = new File(workDir.getParent(), "work");
-      ArrayList<URL> urllist = new ArrayList<URL>();
-      
-      // add the jars and directories to the classpath
-      String jar = conf.getJar();
-      if (jar != null) {      
-        File[] libs = new File(jobCacheDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            urllist.add(libs[i].toURL());
-          }
-        }
-        urllist.add(new File(jobCacheDir, "classes").toURL());
-        urllist.add(jobCacheDir.toURL());
+    /* 
+     PATCH NOTE
+        Move configureClasspath to parent class
+     */    
         
-      }
-      urllist.add(workDir.toURL());
-      
-      // create a new classloader with the old classloader as its parent
-      // then set that classloader as the one used by the current jobconf
-      URL[] urls = urllist.toArray(new URL[urllist.size()]);
-      URLClassLoader loader = new URLClassLoader(urls, parent);
-      conf.setClassLoader(loader);
-    }
-    
     public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
       throws IOException {
       
-      configureClasspath(conf);
       this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
       this.umbilical = umbilical;      
       this.reduceTask = ReduceTask.this;
Index: src/java/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/java/org/apache/hadoop/mapred/MapTask.java      (revision 570771)
+++ src/java/org/apache/hadoop/mapred/MapTask.java      (working copy)
@@ -115,11 +115,18 @@
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
+       /* 
+      PATCH NOTE
+      See comment in org.apache.hadoop.mapred.Task 
+    */
+    configureClasspath(job);
+    
     final Reporter reporter = getReporter(umbilical);
 
     // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
 
+    
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     MapOutputCollector collector = null;

Reply via email to