Unittest for tethered map reduce in java
----------------------------------------

                 Key: AVRO-847
                 URL: https://issues.apache.org/jira/browse/AVRO-847
             Project: Avro
          Issue Type: Test
          Components: java
    Affects Versions: 1.6.0
            Reporter: Jeremy Lewi
            Priority: Minor


I was working on a unittest to run the java example of a tethered map reduce 
job (see Issue #512). While doing so I ran into a couple of issues. I was able 
to work around the issues and will submit a patch but I'm not sure my fixes 
were correct. The issues are below.

1) A deadlock appears to occur if the subprocess can't start. This happens 
because the parent process is waiting for the childProcess to send the 
configure command, which it can't do if the process failed to start. I solved 
this by adding some code to TetheredProcess to check if the subprocess has 
already exited.

2) The tethered classes don't provide a way to pass command line arguments to 
the program executed to start the subprocess. I solved this by adding some 
appropriate keys to the job configuration XML file to specify a) the 
executable, b) the command line arguments and c) whether or not the executable 
should be distributed via the DistributeCache mechanism to all the nodes.

3) I ran into some communication problems between the child and parent 
processes. These seemed to be because the child was using "Sasl" for the 
protocol but the parent was using "Socket" (e.g SocketServer, 
SocketTransciever).

4) In TetherJob.setupJob I needed to set the MapOutputKeyClass to TetherData; 
otherwise I was getting an error about the mapper expecting type AvroKey for 
the data but getting type TetherData.

5) During the sort phase when it called compare on the keys I was getting an 
exception (stack trace below). I think this is because the buffer collecting 
the output from the mapper wasn't being flushed. I was able to "solve" this 
problem by forcing a flush in TetherKeySerializer.serialize. My limited 
knowledge of the innards of map/reduce avro lead me to believe this is a less 
than ideal solution. I would imagine that during the M/R job, a flush should be 
occurring at the end of the map phase and that should make it unnecessary to 
invoke flush each time serialize is invoked.

org.apache.avro.AvroRuntimeException: java.io.EOFException 
at org.apache.avro.io.BinaryData.compare(BinaryData.java:74) 
at 
org.apache.avro.mapred.tether.TetherKeyComparator.compare(TetherKeyComparator.java:46)
 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:942) 
at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:30) 
at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:83) 
at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59) 
at 
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1228)
 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1129) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:359) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) 
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to