Quan cheng created PHOENIX-1868:
-----------------------------------

             Summary: follow phoenix map/reduce sample code encounter cast error
                 Key: PHOENIX-1868
                 URL: https://issues.apache.org/jira/browse/PHOENIX-1868
             Project: Phoenix
          Issue Type: Test
    Affects Versions: 4.2.0
         Environment: hadoop-2.5.2/ hbase-0.98.8-hadoop2 / phoenix 4.3.1
            Reporter: Quan cheng
             Fix For: 4.2.0


I followed the map/reduce sample code, develop my map/reduce code, and I always 
encounter Text cast error:

5/04/15 16:57:43 WARN mapred.LocalJobRunner: job_local1114443330_0001
java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to 
org.apache.hadoop.io.NullWritable
        at 
org.apache.phoenix.mapreduce.PhoenixRecordWriter.write(PhoenixRecordWriter.java:39)
        at 
org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:576)
        at 
org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at 
org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
        at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at 
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:645)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:405)
        at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:445)
15/04/15 16:57:44 INFO mapreduce.Job:  map 100% reduce 0%
15/04/15 16:57:44 INFO mapreduce.Job: Job job_local1114443330_0001 failed with 
state FAILED due to: NA

Here is my code:

public class PMRAnalyzer {
        
        public static class PMRAnalyzerReducer extends Reducer<Text 
,BytesWritable, NullWritable, PMRAnalyzResultWritable>{
                
                protected void reduce(Text key, BytesWritable findings, Context 
context)throws IOException, InterruptedException {
                        
                        Text pmr_token_text = new Text();
                
                        String pmr_token = key.toString();
                        String findings_jsonstr = new 
String(findings.copyBytes());
                        
                        PMRAnalyzResultWritable result = new 
PMRAnalyzResultWritable();
                        
                        JSONUtil util = new JSONUtil();
                        PMRAnalyzeResult resultvo = new PMRAnalyzeResult();
                        try{
                        resultvo = 
(PMRAnalyzeResult)util.readJsonToVO(findings_jsonstr,PMRAnalyzeResult.class);   
                                                     
                        
                        result.setProblem_description(new 
String(Base64.decode(resultvo.getProblem_description())));
                        }catch(Exception e){e.printStackTrace();}
                        
                        result.setFindings(findings_jsonstr);
                        //NullWritable nullwritable = NullWritable.get();
                        
//System.out.println("findings_jsonstr:"+findings_jsonstr);
                        context.write(NullWritable.get(), result);
                        
                        
                }
                
                
        }

        public static class PMRAnalyzerMapper extends
                        Mapper<NullWritable, PMRAnalyzResultWritable, Text, 
BytesWritable> {
        


                protected void map(NullWritable key,
                                PMRAnalyzResultWritable analyzeResultWritable, 
Context context)
                                throws IOException, InterruptedException {
                        final String the_pmr_token = 
analyzeResultWritable.getPMRNO() + "_"
                                        + 
analyzeResultWritable.getCREATED_TIME();
                        final String pmr_text = 
analyzeResultWritable.getPMR_TEXT();
                        
                        Text pmr_token = new Text();
                        Text findings = new Text();
         
                                PMRAnalyzResultWritable resultwritable = new 
PMRAnalyzResultWritable();
                                pmr_token.set(the_pmr_token);
                                //findings.set(findings.toString());
                                PMRParser parser = new PMRParser();
                                String jsonstr = parser.parsePMRText(pmr_text, 
parser);                 
                                findings.set(jsonstr);                          
                                context.write(pmr_token, new 
BytesWritable(jsonstr.getBytes()));

                
                }

        
                
        }
        
        public static void main(String[] args) {
                // TODO Auto-generated method stub
                try{
                final Configuration configuration = HBaseConfiguration.create();
                final Job job = Job.getInstance(configuration, 
"phoenix-mr-job");

                // We can either specify a selectQuery or ignore it when we 
would like to retrieve all the columns
                final String selectQuery = "SELECT PMRNO,CREATED_TIME,PMR_TEXT 
FROM PMRTEXT ";

                // StockWritable is the DBWritable class that enables us to 
process the Result of the above query
                PhoenixMapReduceUtil.setInput(job, 
PMRAnalyzResultWritable.class, "PMRTEXT",  selectQuery);  

                // Set the target Phoenix table and the columns
                PhoenixMapReduceUtil.setOutput(job, "PMR_ANALYZER", 
"PMR_TOKEN,PROBLEM_DESCRIPTION,FINDINGS");

                job.setMapperClass(PMRAnalyzerMapper.class);
                job.setReducerClass(PMRAnalyzerReducer.class); 
                job.setOutputFormatClass(PhoenixOutputFormat.class);

                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(BytesWritable.class);
                job.setOutputKeyClass(NullWritable.class);
                job.setOutputValueClass(PMRAnalyzResultWritable.class); 
                TableMapReduceUtil.addDependencyJars(job);
                job.waitForCompletion(true);
                }catch(Exception e){e.printStackTrace();}

        }


}



 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to