Hi,
I am trying to run a map reduce job which parses a text file and fills up a
Hbase Table. Following is the code :
public class UploadMoviesList extends Configured implements Tool
{
public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, MapWritable>
{
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, MapWritable> output, Reporter reporter) throws
IOException
{
String line = value.toString();
String[] result = line.split("%");
MapWritable mw = new MapWritable();
mw.put(new Text("year:year"), new Text(result[1].toString()));
mw.put(new Text("name:name"), new Text(result[2].toString()));
int a = new Integer(result[3]).intValue();
mw.put(new Text("y_movie_id:y_movie_id"), new IntWritable(a));
int b = new Integer(result[0]).intValue();
output.collect(new IntWritable(b), mw);
}
}
public static class ReduceClass extends TableReduce<IntWritable,
MapWritable>
{
@Override
public void reduce(IntWritable key, Iterator<MapWritable> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter
reporter) throws IOException
{
reporter.setStatus("Reducer committing " + key);
ImmutableBytesWritable ibw = new
ImmutableBytesWritable(Bytes.toBytes(key.get()));
BatchUpdate outval = new BatchUpdate(Bytes.toBytes(key.get()));
while (values.hasNext())
{
MapWritable hmw = new MapWritable(values.next());
outval.put("year:year",
Bytes.toBytes(hmw.get("year:year").toString()));
outval.put("name:name",
Bytes.toBytes(hmw.get("name:name").toString()));
IntWritable iw = (IntWritable)(hmw.get("y_movie_id:y_movie_id"));
outval.put("y_movie_id:y_movie_id", Bytes.toBytes(iw.get()));
output.collect(ibw,outval);
}
}
}
When I try to run it, I am getting following exceptions :
08/11/28 14:42:27 INFO mapred.JobClient: Task Id :
attempt_200811281158_0005_m_000001_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved
org.apache.hadoop.io.IntWritable
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:415)
at dist_q_data.UploadMoviesList$MapClass.map(UploadMoviesList.java:45)
at dist_q_data.UploadMoviesList$MapClass.map(UploadMoviesList.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
I don't know why it says it expects a ImmutableBytesWritable key. Any
suggestions ?
Thanks
--
Nishant Khurana
Candidate for Masters in Engineering (Dec 2009)
Computer and Information Science
School of Engineering and Applied Science
University of Pennsylvania