Author: omalley
Date: Mon Aug 18 15:22:16 2008
New Revision: 686883
URL: http://svn.apache.org/viewvc?rev=686883&view=rev
Log:
HADOOP-2130. Pipes submit job should have both blocking and non-blocking
versions. (acmurthy via omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686883&r1=686882&r2=686883&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 18 15:22:16 2008
@@ -222,6 +222,9 @@
HADOOP-3816. Faster directory listing in KFS. (Sriram Rao via omalley)
+ HADOOP-2130. Pipes submit job should have both blocking and non-blocking
+ versions. (acmurthy via omalley)
+
BUG FIXES
HADOOP-3563. Refactor the distributed upgrade code so that it is
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java?rev=686883&r1=686882&r2=686883&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
Mon Aug 18 15:22:16 2008
@@ -209,8 +209,40 @@
* to the job to run under pipes are made to the configuration.
* @param conf the job to submit to the cluster (MODIFIED)
* @throws IOException
+ * @deprecated Use [EMAIL PROTECTED] Submitter#runJob(JobConf)}
*/
+ @Deprecated
public static RunningJob submitJob(JobConf conf) throws IOException {
+ return runJob(conf);
+ }
+
+ /**
+ * Submit a job to the map/reduce cluster. All of the necessary modifications
+ * to the job to run under pipes are made to the configuration.
+ * @param conf the job to submit to the cluster (MODIFIED)
+ * @throws IOException
+ */
+ public static RunningJob runJob(JobConf conf) throws IOException {
+ setupPipesJob(conf);
+ return JobClient.runJob(conf);
+ }
+
+ /**
+ * Submit a job to the Map-Reduce framework.
+ * This returns a handle to the [EMAIL PROTECTED] RunningJob} which can be
used to track
+ * the running-job.
+ *
+ * @param conf the job configuration.
+ * @return a handle to the [EMAIL PROTECTED] RunningJob} which can be used
to track the
+ * running-job.
+ * @throws IOException
+ */
+ public static RunningJob jobSubmit(JobConf conf) throws IOException {
+ setupPipesJob(conf);
+ return new JobClient(conf).submitJob(conf);
+ }
+
+ private static void setupPipesJob(JobConf conf) throws IOException {
// default map output types to Text
if (!getIsJavaMapper(conf)) {
conf.setMapRunnerClass(PipesMapRunner.class);
@@ -258,7 +290,6 @@
throw ie;
}
DistributedCache.setCacheFiles(fileCache, conf);
- return JobClient.runJob(conf);
}
/**
@@ -414,7 +445,7 @@
pathToFile(new
Path(jarFile)).toURL()});
conf.setClassLoader(loader);
}
- submitJob(conf);
+ runJob(conf);
} catch (OptionException oe) {
cli.printUsage();
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=686883&r1=686882&r2=686883&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Mon Aug 18 15:22:16 2008
@@ -149,8 +149,21 @@
Submitter.setIsJavaRecordWriter(job, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
- RunningJob result = Submitter.submitJob(job);
- assertTrue("pipes job failed", result.isSuccessful());
+ RunningJob rJob = null;
+ if (numReduces == 0) {
+ rJob = Submitter.jobSubmit(job);
+
+ while (!rJob.isComplete()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ }
+ } else {
+ rJob = Submitter.runJob(job);
+ }
+ assertTrue("pipes job failed", rJob.isSuccessful());
}
List<String> results = new ArrayList<String>();