On Feb 1, 2011, at 11:40 PM, Keith Wiley wrote:
> I would really appreciate any help people can offer on the following matters.
>
> When running a streaming job, -D, -files, -libjars, and -archives don't seem
> work, but -jobconf, -file, -cacheFile, and -cacheArchive do. With the first
> four parameters anywhere in command I always get a "Streaming Command
> Failed!" error. The last four work though. Note that some of those
> parameters (-files) do work when I a run a Hadoop job in the normal
> framework, just not when I specify the streaming jar.
There are some issues with how the streaming jar processes the command
line, especially in 0.20, in that they need to be in the correct order. In
general, the -D's need to be *before* the rest of the streaming params. This
is what works for me:
hadoop \
jar \
`ls $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar` \
-Dmapred.reduce.tasks.speculative.execution=false \
-Dmapred.map.tasks.speculative.execution=false \
-Dmapred.job.name="oh noes aw is doing perl again" \
-input ${ATTEMPTIN} \
-output ${ATTEMPTOUT} \
-mapper map.pl \
-reducer reduce.pl \
-file jobsvs-map1.pl \
-file jobsvs-reduce1.pl
> I have found examples online, but they always reference "built-in" classes.
> If I try to use my own class, the job tracker produces a "Cannot run program
> "org.uw.astro.coadd.Reducer2": java.io.IOException: error=2, No such file or
> directory" error.
I wouldn't be surprised if it is a bug. It might be worthwhile to dig
into the streaming jar to figure out how it determines whether something is a
class or not. [It might even do something dumb like "is it org.apache.blah?"]
> How do I force a single record (input file) to be processed by a single
> mapper to get maximum parallelism?
> All I found online was this terse description (of an example that gzips
> files, not my application):
> • Generate a file containing the full HDFS path of the input files.
> Each map task would get one file name as input.
> • Create a mapper script which, given a filename, will get the file to
> local disk, gzip the file and put it back in the desired output directory
These work, but are less than ideal.
> I don't understand exactly what that means and how to go about doing it. In
> the normal Hadoop framework I have achieved this goal by setting
> mapred.max.split.size small enough that only one input record fits (about
> 6MBs), but I tried that with my streaming job ala "-jobconf
> mapred.max.split.size=X" where X is a very low number, about as many as a
> single streaming input record (which in the streaming case is not 6MB, but
> merely ~100 bytes, just a filename referenced ala -cacheFile), but it didn't
> work, it sent multiple records to each mapper anyway.
What you actually want to do is set mapred.min.split.size set to an
extremely high value. Setting max.split.size only works on Combined- and
MultiFile- InputFormat for some reason.
Also, you might be able to change the inputformat. My experiences with
doing this are Not Good(tm).
> Achieving 1-to-1 parallelism between map tasks, nodes, and input records is
> very import because my map tasks take a very long time to run, upwards of an
> hour. I cannot have them queueing up on a small number of nodes while there
> are numerous unused nodes (task slots) available to be doing work.
If all the task slots are in use, why would you care if they are
queueing up? Also keep in mind that if a node fails, that work will need to
get re-done anyway.