How many tasks are you seeing in your mapToPair stage? Is it 7000? then i
suggest you giving a number similar/close to 7000 in your .distinct call,
what is happening in your case is that, you are repartitioning your data to
a smaller number (32) which would put a lot of load on processing i
believe, you can try increasing it.

Thanks
Best Regards

On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele <gangele...@gmail.com>
wrote:

> Akhil, any thought on this?
>
> On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com> wrote:
>
>> No I did not tried the partitioning below is the full code
>>
>> public static  void  matchAndMerge(JavaRDD<VendorRecord>
>> matchRdd,JavaSparkContext jsc) throws IOException{
>>  long start = System.currentTimeMillis();
>>   JavaPairRDD<Long, MatcherReleventData> RddForMarch
>> =matchRdd.zipWithIndex().mapToPair(new
>> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() {
>>
>> @Override
>> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long>
>> t)
>> throws Exception {
>> MatcherReleventData matcherData = new MatcherReleventData();
>> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long,
>> MatcherReleventData>(t._2,
>> matcherData.convertVendorDataToMatcherData(t._1));
>>  return tuple;
>> }
>>
>> }).cache();
>>  log.info("after index"+RddForMarch.take(1));
>>  Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap();
>> Map<Long, MatcherReleventData> matchData = new HashMap<Long,
>> MatcherReleventData>(tmp);
>> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal =
>> jsc.broadcast(matchData);
>>
>> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new
>> Function<MatcherReleventData, Iterable<String>>(){
>>
>> @Override
>> public Iterable<String> call(MatcherReleventData v1)
>> throws Exception {
>> List<String> values = new ArrayList<String>();
>> HelperUtilities helper1 = new HelperUtilities();
>> MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
>> if(matchkeys.get_companyName() !=null){
>> values.add(matchkeys.get_companyName());
>> }
>> if(matchkeys.get_phoneNumberr() !=null){
>> values.add(matchkeys.get_phoneNumberr());
>> }
>> if(matchkeys.get_zipCode() !=null){
>> values.add(matchkeys.get_zipCode());
>> }
>> if(matchkeys.getM_domain() !=null){
>> values.add(matchkeys.getM_domain());
>> }
>>   return values;
>> }
>>  });
>>  log.info("blocking RDD is"+blockingRdd.count());
>> int count=0;
>> log.info("Starting printing");
>>   for (Tuple2<Long, String> entry : blockingRdd.collect()) {
>>
>>   log.info(entry._1() + ":" + entry._2());
>>       count++;
>>     }
>>   log.info("total count"+count);
>>  JavaPairRDD<Long,Integer>
>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>> Iterable<Integer>>(){
>>
>> @Override
>> public Iterable<Integer> call(String v1) throws Exception {
>> return ckdao.getSingelkeyresult(v1);
>> }
>>  }).distinct(32);
>>  log.info("after hbase count is"+completeDataToprocess.count());
>>  log.info("data for process"+completeDataToprocess.take(1));
>>  JavaPairRDD<Long, Tuple2<Integer, Double>> withScore
>> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>,
>> Long, Tuple2<Integer, Double>>(){
>>
>> @Override
>> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> t)
>> throws Exception {
>> Scoring scoreObj = new Scoring();
>> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
>> dataMatchGlobal.getValue().get(t._1()));
>> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(),
>> score);
>> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long,
>> Tuple2<Integer,Double>>(t._1(), maptuple);
>> return tuple;
>> }
>>  });
>>  log.info("with score tuple is"+withScore.take(1));
>>  JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD
>> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>,
>> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){
>>
>> @Override
>> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1,
>> Tuple2<Integer, Double> v2) throws Exception {
>>  int res =v1._2().compareTo(v2._2());
>> if(res >0){
>>  Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(),
>> v1._2());
>> return result;
>>  }
>> else if(res<0){
>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
>> v2._2());
>> return result;
>> }
>> else{
>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
>> v2._2());
>> return result;
>> }
>>   }
>>  });
>>  log.info("max score RDD"+maxScoreRDD.take(10));
>>
>>  maxScoreRDD.foreach( new
>> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){
>>
>> @Override
>> public void call(Tuple2<Long, Tuple2<Integer, Double>> t)
>> throws Exception {
>> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
>> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1()));
>> //Set the score for better understanding of merge
>> matchedData.setScore(t._2()._2());
>> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id");
>>  }
>>  });
>>  log.info("took " + (System.currentTimeMillis() - start) + " mills to
>> run matcher");
>>
>>
>>
>>  }
>>
>>
>> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>>
>>> Can you paste your complete code? Did you try repartioning/increasing
>>> level of parallelism to speed up the processing. Since you have 16 cores,
>>> and I'm assuming your 400k records isn't bigger than a 10G dataset.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <
>>> gangele...@gmail.com> wrote:
>>>
>>>> I already checked and G is taking 1 secs for each task. is this too
>>>> much? if yes how to avoid this?
>>>>
>>>>
>>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Open the driver ui and see which stage is taking time, you can look
>>>>> whether its adding any GC time etc.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele <
>>>>> gangele...@gmail.com> wrote:
>>>>>
>>>>>> Hi All I have below code whether distinct is running for more time.
>>>>>>
>>>>>> blockingRdd is the combination of <Long,String> and it will have 400K
>>>>>> records
>>>>>> JavaPairRDD<Long,Integer>
>>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>>>>> Iterable<Integer>>(){
>>>>>>
>>>>>> @Override
>>>>>> public Iterable<Integer> call(String v1) throws Exception {
>>>>>> return ckdao.getSingelkeyresult(v1);
>>>>>> }
>>>>>>  }).distinct(32);
>>>>>>
>>>>>> I am running distinct on 800K records and its taking 2 hours on 16
>>>>>> cores and 20 GB RAM.
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>>
>>
>
>
>
>

Reply via email to