[ http://issues.apache.org/jira/browse/HADOOP-811?page=comments#action_12458274 ] Hadoop QA commented on HADOOP-811: ----------------------------------
+1, since http://issues.apache.org/jira/secure/attachment/12347069/diff.txt applied and successfully tested against trunk revision r486810. > Patch to support multi-threaded MapRunnable > ------------------------------------------- > > Key: HADOOP-811 > URL: http://issues.apache.org/jira/browse/HADOOP-811 > Project: Hadoop > Issue Type: New Feature > Components: mapred > Affects Versions: 0.10.0 > Environment: all > Reporter: Alejandro Abdelnur > Attachments: diff.txt, diff.txt, MultithreadedMapRunner.java, > MultithreadedMapRunner.java > > > The MapRunner calls Mapper.map in a serialized fashion. > This is suitable for CPU/memory bound operations. > However, when doing IO bound operations this serialization affects the > throughput significantly. > In order to support IO bound operations more efficiently I've implemented a > multithreaded MapRunnable. > Following is the implementation of this MapRunnable, MultithreadedMapRunner. > I've only had to modify on method in the existing code (in the Task class) to > avoid data corruption in the reporter. > Index: Task.java > =================================================================== > --- Task.java (revision 485492) > +++ Task.java (working copy) > @@ -153,9 +153,11 @@ > public Reporter getReporter(final TaskUmbilicalProtocol umbilical, > final Progress progress) throws IOException { > return new Reporter() { > - public void setStatus(String status) throws IOException { > - progress.setStatus(status); > - progress(); > + public void setStatus(String status) throws IOException { > + synchronized (this) { > + progress.setStatus(status); > + progress(); > + } > } > public void progress() throws IOException { > reportProgress(umbilical); > ----------------------------------------------------------- > MultithreadedMapRunner.java > package org.apache.hadoop.mapred; > import org.apache.hadoop.util.ReflectionUtils; > import org.apache.hadoop.io.WritableComparable; > import org.apache.hadoop.io.Writable; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import java.io.IOException; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.TimeUnit; > /** > * Multithreaded implementation for @link > org.apache.hadoop.mapred.MapRunnable. > * <p> > * It can be used instead of the default implementation, > * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU > * bound in order to improve throughput. > * <p> > * Map implementations using this MapRunnable must be thread-safe. > * <p> > * The Map-Reduce job has to be configured to use this MapRunnable class > (using > * the <b>mapred.map.runner.class</b> property) and > * the number of thread the thread-pool can use (using the > * <b>mapred.map.multithreadedrunner.threads</b> property). > * <p> > * > * @author Alejandro Abdelnur > */ > public class MultithreadedMapRunner implements MapRunnable { > private static final Log LOG = > LogFactory.getLog(MultithreadedMapRunner.class.getName()); > private JobConf job; > private Mapper mapper; > private ExecutorService executorService; > private IOException ioException; > public void configure(JobConf job) { > int numberOfThreads = > job.getInt("mapred.map.multithreadedrunner.threads", 10); > if (LOG.isDebugEnabled()) { > LOG.debug("Configuring job " + job.getJobName() + > " to use " + numberOfThreads + " threads" ); > } > this.job = job; > this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(), > job); > // Creating a threadpool of the configured size to execute the Mapper > // map method in parallel. > executorService = Executors.newFixedThreadPool(numberOfThreads); > } > public void run(RecordReader input, OutputCollector output, > Reporter reporter) > throws IOException { > try { > // allocate key & value instances these objects will not be reused > // because execution of Mapper.map is not serialized. > WritableComparable key = input.createKey(); > Writable value = input.createValue(); > while (input.next(key, value)) { > // Run Mapper.map execution asynchronously in a separate thread. > // If threads are not available from the thread-pool this method > // will block until there is a thread available. > executorService.execute( > new MTMapperRunable(key, value, output, reporter)); > // Checking if a Mapper.map within a Runnable has generated an > // IOException. If so we rethrow it to force an abort of the Map > // operation thus keeping the semantics of the default > // implementation. > if (ioException != null) { > throw ioException; > } > // Allocate new key & value instances as mapper is running in parallel > key = input.createKey(); > value = input.createValue(); > } > if (LOG.isDebugEnabled()) { > LOG.debug("Finished dispatching all Mappper.map calls, job " > + job.getJobName()); > } > // Graceful shutdown of the Threadpool, it will let all scheduled > // Runnables to end. > executorService.shutdown(); > try { > // Now waiting for all Runnables to end. > while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) > { > if (LOG.isDebugEnabled()) { > LOG.debug("Awaiting all running Mappper.map calls to finish, job " > + job.getJobName()); > } > // Checking if a Mapper.map within a Runnable has generated an > // IOException. If so we rethrow it to force an abort of the Map > // operation thus keeping the semantics of the default > // implementation. > // NOTE: while Mapper.map dispatching has concluded there are still > // map calls in progress. > if (ioException != null) { > throw ioException; > } > } > // Checking if a Mapper.map within a Runnable has generated an > // IOException. If so we rethrow it to force an abort of the Map > // operation thus keeping the semantics of the default > // implementation. > // NOTE: it could be that a map call has had an exception after the > // call for awaitTermination() returing true. And edge case but it > // could happen. > if (ioException != null) { > throw ioException; > } > } > catch (IOException ioEx) { > // Forcing a shutdown of all thread of the threadpool and rethrowing > // the IOException > executorService.shutdownNow(); > throw ioEx; > } > catch (InterruptedException iEx) { > throw new IOException(iEx.getMessage()); > } > } finally { > mapper.close(); > } > } > /** > * Runnable to execute a single Mapper.map call from a forked thread. > */ > private class MTMapperRunable implements Runnable { > private WritableComparable key; > private Writable value; > private OutputCollector output; > private Reporter reporter; > /** > * Collecting all required parameters to execute a Mapper.map call. > * <p> > * > * @param key > * @param value > * @param output > * @param reporter > */ > public MTMapperRunable(WritableComparable key, Writable value, > OutputCollector output, Reporter reporter) { > this.key = key; > this.value = value; > this.output = output; > this.reporter = reporter; > } > /** > * Executes a Mapper.map call with the given Mapper and parameters. > * <p> > * This method is called from the thread-pool thread. > * > */ > public void run() { > try { > // map pair to output > MultithreadedMapRunner.this.mapper.map(key, value, output, reporter); > } > catch (IOException ex) { > // If there is an IOException during the call it is set in an instance > // variable of the MultithreadedMapRunner from where it will be > // rethrown. > synchronized (MultithreadedMapRunner.this) { > if (MultithreadedMapRunner.this.ioException == null) { > MultithreadedMapRunner.this.ioException = ex; > } > } > } > } > } > } -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira