We found Map/Reduce job composition to be useful when intermediate result
sizes were comparable or larger than the size of their input.
We implemented a prototype where intermediate Map/Reduce steps are composed
into a single task. Pictorially, we transform a sequence of Map and Reduce
jobs:
M -> R, M -> R, M -> R // default behavior
into:
M -> R|M -> R|M -> R // with composition
Here, “R|M” denotes the composition of a Reduce (R) with a subsequent Map
(M), allowing the output of R to be directly consumed by M. In our
prototype, the programmer provides the same Map and Reduce classes as
before, as well as the sequence of Map/Reduce jobs to run. Then, under the
covers, the system takes care of composing the intermediate Map and Reduce
steps.
To illustrate how this can improve performance, denote a read and write to
HDFS as “rh” and “wh”, respectively, and denote a read and write to the
local file system read as “rl”, and “wl”. Finally, denote a network
transfer by “n”. Then, in terms of disk and network I/O, the two cases
above can be described in more detail as:
M(rh, wl, n) -> R(wl, rl, wh), M(rh, wl, n) -> R(wl, rl, wh) //
default behavior
verses:
M(rh, wl, n) -> R|M(wl, rl, wl, n) -> R(wl, rl, wh) //
with composition
Comparing the two cases, there are two key points to notice: 1) Each
composed R|M task eliminates an HDFS read and write, 2) If a node with an
R|M task fails before its output is consumed, the whole task must be
re-evaluated, since data is not written to HDFS. This re-evaluation can
potentially cascade back to a Map task from the first step but it does not
necessarily require that the whole job be re-evaluated.
To compare the two cases analytically, assume for the sake of simplicity
that the size of the input data is equal to the size of intermediate and
output data. Let a “pass” over the data denote reading or writing the data
to disk or transferring the data over the network. Then with N Map/Reduce
jobs, we have:
(N jobs* 2 tasks/job * 3 passes/job) = 6*N passes // default
behavior
verses:
3 + 4*(N-1) + 3
passes // with
composition
Comparing the two cases, we have 6 + 6*(N-1) passes verses 6 + 4*(N-1)
passes, which is a savings of 33% with composition.
With our prototype running on a 10-node cluster, we achieved a 15 to 20%
speedup on a workload with 3- to 5-way joins using Map/Reduce composition.
Note that we used a replication factor of 1 when writing HDFS in both
cases. While we have not yet explored scheduling and other optimization
issues, knowing that multiple jobs are related may lead to further
improvements in performance.
Our prototype did add complexity to the JobTracker, since now a sequence of
Map/Reduce jobs must be configured and tracked. This makes job.xml more
complicated. Also, as noted earlier, an intermediate R|M step writes its
output to the local file system instead of HDFS. So if its node fails, the
output on that node is lost. But if the output of R|M steps are written to
HDFS instead, then we can still avoid 2 passes over the data, while
benefiting from the fault tolerance that HDFS provides to avoid cascading
re-evaulation. We did not implement this, however.
To summarize, Map/Reduce composition can improve the performance of a
sequence of Map/Reduce jobs by eliminating disk I/O for intermediate
results. However, it comes with increased system complexity and possibly
more re-evaluations in the face of node failures. We would be interested in
getting more feedback on whether people think some of the ideas regarding
Map/Reduce composition are worth considering in Hadoop.
Thanks!
Vuk
Milind Bhandarkar
<[EMAIL PROTECTED]
c.com> To
<[email protected]>
08/29/2007 11:10 cc
PM
Subject
Re: Poly-reduce?
Please respond to
[EMAIL PROTECTED]
e.apache.org
I agree with Owen and Doug. As long as the intermediate outputs (i.e. Data
in between phases) are stored on tasktrackers' local disks, prone to
failure, having more than two phases will be counterproductive. If
intermediate data storage were on a fault-tolerant DFS, one would see more
benefits of chaining arbitrary sequence of phases. (But then the reasoning
in the original email for having multiple-phases, i.e not having to upload
data to DFS, would no longer be valid.)
- milind
On 8/24/07 9:53 AM, "Doug Cutting" <[EMAIL PROTECTED]> wrote:
> Ted Dunning wrote:
>> It isn't hard to implement these programs as multiple fully fledged
>> map-reduces, but it appears to me that many of them would be better
>> expressed as something more like a map-reduce-reduce program.
>>
>> [ ... ]
>>
>> Expressed conventionally, this would have write all of the user sessions
to
>> HDFS and a second map phase would generate the pairs for counting. The
>> opportunity for efficiency would come from the ability to avoid writing
>> intermediate results to the distributed data store.
>>
>> Has anybody looked at whether this would help and whether it would be
hard
>> to do?
>
> It would job tracker more complicated, and might not help job execution
> time that much.
>
> Consider implementing this as multiple map reduce steps, but using a
> replication level of one for intermediate data. That would mostly have
> the performance characteristics you want. But if a node died, things
> could not intelligently automatically re-create just the missing data.
> Instead the application would have to re-run the entire job, or subsets
> of it, in order to re-create the un-replicated data.
>
> Under poly-reduce, if a node failed, all tasks that were incomplete on
> that node would need to be restarted. But first, their input data would
> need to be located. If you saved all intermediate data in the course of
> a job (which would be expensive) then the inputs that need re-creation
> would mostly just be those that were created on the failed node. But
> this failure would generally cascade all the way back to the initial map
> stage. So a single machine failure in the last phase could double the
> run time of the job, with most of the cluster idle.
>
> If, instead, you used normal mapreduce, with intermediate data
> replicated in the filesystem, a single machine failure in the last phase
> would only require re-running tasks from the last job.
>
> Perhaps, when chaining mapreduces, one should use a lower replication
> level for intermediate data, like two. Additionally, one might wish to
> relax the one-replica-off-rack criterion for such files, so that
> replication is faster, and since whole-rack failures are rare. This
> might give good chained performance, but keep machine failures from
> knocking tasks back to the start of the chain. Currently its not
> possible to disable the one-replica-off-rack preference, but that might
> be a reasonable feature request.
>
> Doug
>
--
Milind Bhandarkar
408-349-2136
([EMAIL PROTECTED])