Hello, 

I recently started to use Hadoop and I have a problem while using a Mapfile as 
a input to a MapReduce job.

The following working code, write a simple MapFile called "TestMap" in hdfs 
where there are three keys of type Text and three value of type BytesWritable

$ hadoop fs  -text /user/hadoop/TestMap/data
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized 
native-zlib library
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor
A       01
B       02
C       03



import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;

public class CreateMap {

    public static void main(String[] args) throws IOException{

        Configuration conf = new Configuration();
        FileSystem hdfs  = FileSystem.get(conf);

        Text key = new Text();
        BytesWritable value = new BytesWritable();
        byte[] data = {1, 2, 3};
        String[] strs = {"A", "B", "C"};
        int bytesRead;
        MapFile.Writer writer = null;
  
        writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), 
value.getClass());
        try {
            for (int i = 0; i < 3; i++) {
                key.set(strs[i]);
                value.set(data, i, 1);
                writer.append(key, value);
                System.out.println(strs[i] + ":" + data[i] + " added.");
            }
        } 
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
             IOUtils.closeStream(writer);
        }
    }
}

The simple MapReduce job that follows try to increment by one the values of the 
mapfile:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;


public class AddOne extends Configured implements Tool {
    
    public static class MapClass extends MapReduceBase
    
        implements Mapper<Text, BytesWritable, Text, Text> {
        
        public void map(Text key, BytesWritable value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {
           
           
            byte[] data = value.getBytes();
            data[0] += 1;
            value.set(data, 0, 1);
            output.collect(key, new Text(value.toString()));
        }
    }
    
    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {
        
        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {
                           
            output.collect(key, values.next());
        }
    }
    
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        
        JobConf job = new JobConf(conf, AddOne.class);
        
        Path in = new Path("TestMap");
        Path out = new Path("output");
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        
        job.setJobName("AddOne");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        
        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
        
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ":");
       
        
        JobClient.runJob(job);
        
        return 0;
    }
    
    public static void main(String[] args) throws Exception { 
        int res = ToolRunner.run(new Configuration(), new AddOne(), args);
        
        System.exit(res);
    }
}

The runtime exception that I get is:
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast 
to org.apache.hadoop.io.BytesWritable
        at AddOne$MapClass.map(AddOne.java:32)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)


I don't understand why hadoop is trying to cast a LongWritable, since in my 
code I define the Mapper interface correctly(Mapper<Text, BytesWritable, Text, 
Text>).

Could somebody help me?

Thank you very much


Luca
 




      

Reply via email to