Well, I think you could probably take care of the issue by using a somewhat different architecture.
If I understand correctly, you take all of the values with the same temperature together. This is in fact a Reduce operation. You could structure as follows: -Read in like you do now, but make your Map simpler. For each map (so for each record), write away the temperature as the key, and the record as a value. -Each reducer will then have a list of records, each with the same temperature. You can sum the entries in the list and write everything away. Then you will have 1 combined result per temperature. You could then start a second job that has a pass-through Mapper, and then do your final calculation in the Reducer. Does it sound like I'm making sense to some degree? :-) Mathias On Wed, Aug 12, 2009 at 2:38 PM, Xine Jar <[email protected]> wrote: > Aha!! I understand!! > So basically this is the reason why I am getting 100 written Map output > records. Because the mapper is calling the collect() of the OutputCollector > 100 times= number of records in the table. > > In this case I assume I have to pass the HBASE table instead of the records > as an input to the mapper right? Is there such a Java example you could > point it out for me? > > Regards, > CJ > > > 2009/8/12 Mathias De Maré <[email protected]> > > Hi, >> >> On Tue, Aug 11, 2009 at 6:27 PM, Xine Jar <[email protected]>wrote: >>> >>> *A snapshot of the Mapper :* >>> >>> *public void map(ImmutableBytesWritable key,RowResult value, >>> OutputCollector<Text, Text> output, Reporter reporter) throws IOException >>> { >>> double numberreadings=0; >>> double sumreadings=0; >>> >>> if(table==null) >>> throw new IOException("table is null"); >>> >>> //set a scanner >>> Scanner scanner=table.getScanner(new String[] {"cf:Value", >>> "cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude", >>> "cf:SensorNode"}); >>> RowResult rowresult=scanner.next(); >>> >>> //scanning the table, filtering out the values, and count them >>> while(rowresult!=null){ >>> >>> String stringtype= new >>> String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue()); >>> >>> if((stringtype).equals("temperature")==true) >>> ///summ the correct reading value >>> {String stringval=new >>> String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue()); >>> double doubleval=Double.parseDouble(stringval.trim()); >>> sumreadings=sumreadings+doubleval; >>> >>> ///summ the number of readings >>> numberreadings=numberreadings+1; >>> } >>> rowresult=scanner.next(); >>> >>> } >>> >>> scanner.close(); >>> >>> //send the summ of the values as well as the number >>> String strsumreadings=Double.toString(sumreadings); >>> String strnumberreadings=Double.toString(numberreadings); >>> String strmapoutvalue= strsumreadings+" "+strnumberreadings; >>> >>> mapoutputvalue.set(strmapoutvalue); >>> output.collect(mapoutputkey,mapoutputvalue); >>> >>> }* >>> >>> >>> *Questions:* >>> 1-For 100 records, I noticed that I have 1 map task and 1 reduce task, >>> and >>> the job finishes after 12 Sec. Whenever I extend the number of records in >>> the htable to 10,000 I still have 1 map and 1 reduce task and the job >>> finishes after 1 hour!!!!!! >>> The mapper is incredibly slow, what is so heavy in my code? >>> >> >> From your code, it looks like you are using the HBase records as input for >> the mapper. Then, for each record, you go through the entire table again, so >> you do N scans of the HBase table, and read in total N*N records. That's >> what's heavy in your code. >> >> Mathias >> >> >
