Thx very much, sorry for the spam in previous mail in that case.

Yep agreed, most of the changes were minor changes really - I'll do as you suggest and submit jira issues for sensible sub groups.

Regards,

Dave

Michael Bieniosek wrote:
The "hadoop way" of submitting patches is to create a JIRA issue for each
patch so they can be tested and discussed separately.  It looks like you
have several unrelated changes in there.  You'll also need to regenerate
your patches against HEAD.

It's always nice to have more contributors.  I'm glad to hear you find
hadoop useful.

-Michael

On 9/5/07 9:04 AM, "David Savage" <[EMAIL PROTECTED]> wrote:

Hi All,

Just popping my head up over the lurkers parapet for a second to let you
know about some development work I've been doing regarding getting
Hadoop to run in an OSGi environment.

Sorry if this is the wrong forum for this post. I did consider sending
this to the hadoop-dev list but on inspection of the traffic there it
seemed to be mostly Jira issue tracking of which this is not yet a part....

In order to get Hadoop to work in an OSGi environment I had to make a
couple of changes to the Hadoop core, mostly with regard to better
handling of classloading in the TaskTracker$Child process.

Attached is a patch file of the changes I made which is compiled against
the 0.14.0 svn tag - comments are included at each diff point to explain
what I'm doing. Obviously really happy for these changes to be added to
Hadoop or for you to suggest any other changes or improvements.

Short background with some product placement - please skip if not in
slightest bit interested...

The background to this work was to get Hadoop running against our
product Infiniflow of which there is an opensource component - Newton
(http://newton.codecauldron.org).

Newton is a distributed runtime based on OSGi which handles runtime
deployment and failover of SCA composites over a compute fabric (our
marketing term for bunch of hetrogenous servers).

Final blurb...

If anyone is able to make it I should hopefully be presenting some of
this work at the NYJavaSIG on 17th October at a session entitled "OSGi
In The Enterprise".

Regards,

Dave Savage
Software Engineer
Paremus Ltd
http://www.paremus.com
http://www.codecauldron.org


_______________________________________________________________________
Paremus Limited. Registered in England
 No. 4181472
 Registered Office: St Alphage House, 2 Fore Street, London, EC2Y 5DH Postal
Address: 107-111 Fleet Street, London, EC4A 2AB
The information transmitted is intended only for the person(s) or entity to
which it is addressed and may contain confidential and/or privileged material.
Any review, retransmission, dissemination or other use of, or taking of any
action in reliance upon, this information by persons or entities other than
the intended recipient is prohibited.
If you received this in error, please contact the sender and delete the
material from any computer.
_______________________________________________________________________
Index: src/java/org/apache/hadoop/mapred/TaskLog.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskLog.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/TaskLog.java (working copy)
@@ -198,8 +198,12 @@
                                                 File stderrFilename,
                                                 long tailLength
                                                ) throws IOException {
-    String stdout = FileUtil.makeShellPath(stdoutFilename);
-    String stderr = FileUtil.makeShellPath(stderrFilename);
+ /* + PATCH NOTE
+  Use absolute filename to get around problem where parent process not in
same dir as child
+     */
+    String stdout = FileUtil.makeShellPath(stdoutFilename.getAbsoluteFile());
+    String stderr = FileUtil.makeShellPath(stderrFilename.getAbsoluteFile());
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
Index: src/java/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/java/org/apache/hadoop/mapred/Task.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/Task.java (working copy)
@@ -40,6 +40,16 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+/* + PATCH NOTE
+ Imports to support patch below
+*/
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import org.apache.hadoop.filecache.DistributedCache;
+
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
   private static final Log LOG =
@@ -205,6 +215,86 @@
     }
   }
+ /* + PATCH NOTE
+    This logic is basically a cut & paste of work done in
org.apache.hadoop.mapred.TaskRunner
+ The reason for doing this here is to get around the issue where the system
classpath
+    does not contain the Hadoop api classes (i.e. Reducer, Mapper etc)
instead these classes
+    are found in osgi classpath loaded from parent classloader.
+  */
+  public void configureClasspath(JobConf conf)
+  throws IOException {
+
+   // get the task and the current classloader which will become the parent
+   ClassLoader parent = conf.getClassLoader();
+
+   // get the work directory which holds the elements we are dynamically
+   // adding to the classpath
+   File workDir = new File(getJobFile()).getParentFile();
+   File jobCacheDir = new File(workDir.getParent(), "work");
+   ArrayList<URL> urllist = new ArrayList<URL>();
+
+   // add the jars and directories to the classpath
+   String jar = conf.getJar();
+   if (jar != null) {
+    File[] libs = new File(jobCacheDir, "lib").listFiles();
+    if (libs != null) {
+     for (int i = 0; i < libs.length; i++) {
+      urllist.add(libs[i].toURL());
+     }
+    }
+    urllist.add(new File(jobCacheDir, "classes").toURL());
+    urllist.add(jobCacheDir.toURL());
+
+   }
+ + URI[] archives = DistributedCache.getCacheArchives(conf);
+      URI[] files = DistributedCache.getCacheFiles(conf);
+      // include the user specified classpath
+ + //archive paths
+      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+      if (archiveClasspaths != null && archives != null) {
+        Path[] localArchives = DistributedCache
+          .getLocalCacheArchives(conf);
+        if (localArchives != null){
+          for (int i=0;i<archives.length;i++){
+            for(int j=0;j<archiveClasspaths.length;j++){
+              if (archives[i].getPath().equals(
+ archiveClasspaths[j].toString())){
+                urllist.add(localArchives[i].toUri().toURL());
+              }
+            }
+          }
+        }
+      }
+      //file paths
+      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+      if (fileClasspaths!=null && files != null) {
+        Path[] localFiles = DistributedCache
+          .getLocalCacheFiles(conf);
+        if (localFiles != null) {
+          for (int i = 0; i < files.length; i++) {
+            for (int j = 0; j < fileClasspaths.length; j++) {
+              if (files[i].getPath().equals(
+                                            fileClasspaths[j].toString())) {
+                  urllist.add(localFiles[i].toUri().toURL());
+              }
+            }
+          }
+        }
+      }
+ + urllist.add(workDir.toURL());
+
+   // create a new classloader with the old classloader as its parent
+   // then set that classloader as the one used by the current jobconf
+   URL[] urls = urllist.toArray(new URL[urllist.size()]);
+   URLClassLoader loader = new URLClassLoader(urls, parent);
+   conf.setClassLoader(loader);
+  }
+
+ /** Run this task as a part of the named job. This method is executed in
the
    * child process and is what invokes user-supplied map, reduce, etc.
methods.
    * @param umbilical for progress reports
Index: src/java/org/apache/hadoop/mapred/TaskRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskRunner.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java (working copy)
@@ -24,6 +24,7 @@
 import org.apache.hadoop.util.*;
 import java.io.*;
 import java.util.List;
+import java.util.StringTokenizer;
 import java.util.Vector;
 import java.net.URI;
@@ -144,69 +145,32 @@
       String sep = System.getProperty("path.separator");
       StringBuffer classPath = new StringBuffer();
       // start with same classpath as parent process
-      classPath.append(System.getProperty("java.class.path"));
+ /* + PATCH NOTE
+    qualify path to get around problem where parent process not in same dir
as child
+       */
+      classPath.append(qualifyPath(System.getProperty("java.class.path")));
       classPath.append(sep);
       if (!workDir.mkdirs()) {
         if (!workDir.isDirectory()) {
           LOG.fatal("Mkdirs failed to create " + workDir.toString());
         }
       }
- - String jar = conf.getJar();
-      if (jar != null) {
-        // if jar exists, it into workDir
-        File[] libs = new File(jobCacheDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            classPath.append(sep);            // add libs from jar to
classpath
-            classPath.append(libs[i]);
-          }
-        }
-        classPath.append(sep);
-        classPath.append(new File(jobCacheDir, "classes"));
-        classPath.append(sep);
-        classPath.append(jobCacheDir);
- + /* + PATCH NOTE
+    Allow extra jars on classpath not necessarily those found on system
classpath
+       (in osgi classpath is dynamically built by installed bundles vs
statically configured
+       at system boot)
+ */ + String jobPath = conf.get( "mapred.child.java.classpath" );
+      if ( jobPath != null ) {
+       classPath.append( jobPath );
+       classPath.append( sep );
       }
-
-      // include the user specified classpath
- - //archive paths
-      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-      if (archiveClasspaths != null && archives != null) {
-        Path[] localArchives = DistributedCache
-          .getLocalCacheArchives(conf);
-        if (localArchives != null){
-          for (int i=0;i<archives.length;i++){
-            for(int j=0;j<archiveClasspaths.length;j++){
-              if (archives[i].getPath().equals(
- archiveClasspaths[j].toString())){
-                classPath.append(sep);
-                classPath.append(localArchives[i]
-                                 .toString());
-              }
-            }
-          }
-        }
-      }
-      //file paths
-      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-      if (fileClasspaths!=null && files != null) {
-        Path[] localFiles = DistributedCache
-          .getLocalCacheFiles(conf);
-        if (localFiles != null) {
-          for (int i = 0; i < files.length; i++) {
-            for (int j = 0; j < fileClasspaths.length; j++) {
-              if (files[i].getPath().equals(
-                                            fileClasspaths[j].toString())) {
-                classPath.append(sep);
-                classPath.append(localFiles[i].toString());
-              }
-            }
-          }
-        }
-      }
-
+ /* + PATCH NOTE
+    Logic moved to Task.configureClasspath - see patch comment there
+       */
       classPath.append(sep);
       classPath.append(workDir);
       //  Build exec child jmv args.
@@ -275,7 +239,16 @@
         vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
         vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
- // Add main class and its arguments
+        // Add main class and its arguments
+ /* + PATCH NOTE
+      Allow client to specify a launcher that launches your main class
+         This allows me to substitute an osgi enabled jvm
+         */
+        String launcher = conf.get( "mapred.child.launcher" );
+        if ( launcher != null ) {
+         vargs.add( launcher );
+        }
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
         // pass umbilical port
         vargs.add(Integer.toString(tracker.getTaskTrackerReportPort()));
@@ -328,7 +301,41 @@
   }
+ /* + PATCH NOTE
+  Utility method to qualify system classpath - could be more efficient,
(StringTokenizer bleugh)
+  */
   /**
+ * @param property
+ * @return
+ */
+ private String qualifyPath( String property ) {
+ StringBuffer buf = new StringBuffer(property.length() * 10);
+ String sep = System.getProperty("path.separator");
+ + StringTokenizer tok = new StringTokenizer(property, sep); + + while( tok.hasMoreTokens() ) {
+  String element = tok.nextToken();
+ + if ( element.trim().length() > 0 ) {
+   File f = new File( element );
+ + if ( f.exists() ) {
+    buf.append( f.getAbsolutePath() );
+   }
+   else {
+    throw new IllegalStateException( "Unknown path element " + tok );
+   }
+ + buf.append( sep );
+  }
+ }
+ + return buf.toString();
+}
+
+/**
    * Handle deprecated mapred.child.heap.size.
    * If present, interpolate into mapred.child.java.opts value with
    * warning.
@@ -399,6 +406,11 @@
       int exit_code = process.waitFor();
if (!killed && exit_code != 0) { + /* + PATCH NOTE
+           Debug to help diagnose problems in launching process
+           */
+          LOG.warn( "Failed to execute : " + argsToString( args ) );
         throw new IOException("Task process exit with nonzero status of " +
                               exit_code + ".");
       }
@@ -408,6 +420,24 @@
kill(); }
   }
+ + /* + PATCH NOTE
+   Utility method used in logging
+  */
+  private String argsToString( List<String> args ) {
+      StringBuffer buf = new StringBuffer();
+ + for ( String arg : args ) {
+          if ( buf.length() != 0 ) {
+              buf.append( ' ' );
+          }
+ + buf.append( arg );
+      }
+ + return buf.toString();
+  }
/**
    * Kill the child process
Index: src/java/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskTracker.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java (working copy)
@@ -21,6 +21,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,6 +29,8 @@
 import java.io.PrintStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -641,7 +644,20 @@
         rjob.localized = true;
       }
     }
-    launchTaskForJob(tip, new JobConf(rjob.jobFile));
+ + /* + PATCH NOTE
+  Allow parent proccess to copy config to child job to enable
+     specification of child launcher and classpath in TaskRunner
+     */
+    JobConf conf = new JobConf(rjob.jobFile);
+    for ( Map.Entry<String, String> entry : fConf ) {
+        if ( entry.getKey().startsWith( "mapred.child" ) ) {
+            conf.set( entry.getKey(), entry.getValue() );
+        }
+    }
+ + launchTaskForJob(tip, conf);
   }
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws
IOException{
Index: src/java/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/java/org/apache/hadoop/mapred/ReduceTask.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/ReduceTask.java (working copy)
@@ -237,6 +237,11 @@
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
+ /* + PATCH NOTE
+      See comment in org.apache.hadoop.mapred.Task
+    */
+ configureClasspath(job);
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
job.getReducerClass(), job); @@ -735,46 +740,14 @@
       }
} - - private void configureClasspath(JobConf conf)
-      throws IOException {
- - // get the task and the current classloader which will become the
parent
-      Task task = ReduceTask.this;
-      ClassLoader parent = conf.getClassLoader();
- - // get the work directory which holds the elements we are dynamically
-      // adding to the classpath
-      File workDir = new File(task.getJobFile()).getParentFile();
-      File jobCacheDir = new File(workDir.getParent(), "work");
-      ArrayList<URL> urllist = new ArrayList<URL>();
- - // add the jars and directories to the classpath
-      String jar = conf.getJar();
-      if (jar != null) {
-        File[] libs = new File(jobCacheDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            urllist.add(libs[i].toURL());
-          }
-        }
-        urllist.add(new File(jobCacheDir, "classes").toURL());
-        urllist.add(jobCacheDir.toURL());
+ /* + PATCH NOTE
+  Move configureClasspath to parent class
+ */ - }
-      urllist.add(workDir.toURL());
- - // create a new classloader with the old classloader as its parent
-      // then set that classloader as the one used by the current jobconf
-      URL[] urls = urllist.toArray(new URL[urllist.size()]);
-      URLClassLoader loader = new URLClassLoader(urls, parent);
-      conf.setClassLoader(loader);
-    }
- public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
       throws IOException {
- configureClasspath(conf);
       this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
       this.umbilical = umbilical;
       this.reduceTask = ReduceTask.this;
Index: src/java/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/java/org/apache/hadoop/mapred/MapTask.java (revision 570771)
+++ src/java/org/apache/hadoop/mapred/MapTask.java (working copy)
@@ -115,11 +115,18 @@
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
+ /* + PATCH NOTE
+      See comment in org.apache.hadoop.mapred.Task
+    */
+    configureClasspath(job);
+ final Reporter reporter = getReporter(umbilical); // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
+ int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     MapOutputCollector collector = null;


_______________________________________________________________________
This email has been scanned by both Message Labs and Paremus internal systems 
for viruses and inappropriate attachments.  Email suspected of carrying a virus 
payload or inappropriate attachment will not be delivered to you, and the 
sender will receive an appropriate warning notice.
_______________________________________________________________________


_______________________________________________________________________
Paremus Limited. Registered in England
No. 4181472
Registered Office: St Alphage House, 2 Fore Street, London, EC2Y 5DH Postal 
Address: 107-111 Fleet Street, London, EC4A 2AB
The information transmitted is intended only for the person(s) or entity to 
which it is addressed and may contain confidential and/or privileged material. 
Any review, retransmission, dissemination or other use of, or taking of any 
action in reliance upon, this information by persons or entities other than the 
intended recipient is prohibited.
If you received this in error, please contact the sender and delete the 
material from any computer.
_______________________________________________________________________

Reply via email to