I made 7000 tasks in mapTopair and in distinct also I made same number of
tasks.
Still lots of shuffle read and write is happening due to application
running for much longer time.
Any idea?

On 17 April 2015 at 11:55, Akhil Das <ak...@sigmoidanalytics.com> wrote:

> How many tasks are you seeing in your mapToPair stage? Is it 7000? then i
> suggest you giving a number similar/close to 7000 in your .distinct call,
> what is happening in your case is that, you are repartitioning your data to
> a smaller number (32) which would put a lot of load on processing i
> believe, you can try increasing it.
>
> Thanks
> Best Regards
>
> On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele <gangele...@gmail.com>
> wrote:
>
>> Akhil, any thought on this?
>>
>> On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com>
>> wrote:
>>
>>> No I did not tried the partitioning below is the full code
>>>
>>> public static  void  matchAndMerge(JavaRDD<VendorRecord>
>>> matchRdd,JavaSparkContext jsc) throws IOException{
>>>  long start = System.currentTimeMillis();
>>>   JavaPairRDD<Long, MatcherReleventData> RddForMarch
>>> =matchRdd.zipWithIndex().mapToPair(new
>>> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() {
>>>
>>> @Override
>>> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long>
>>> t)
>>> throws Exception {
>>> MatcherReleventData matcherData = new MatcherReleventData();
>>> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long,
>>> MatcherReleventData>(t._2,
>>> matcherData.convertVendorDataToMatcherData(t._1));
>>>  return tuple;
>>> }
>>>
>>> }).cache();
>>>  log.info("after index"+RddForMarch.take(1));
>>>  Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap();
>>> Map<Long, MatcherReleventData> matchData = new HashMap<Long,
>>> MatcherReleventData>(tmp);
>>> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal =
>>> jsc.broadcast(matchData);
>>>
>>> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new
>>> Function<MatcherReleventData, Iterable<String>>(){
>>>
>>> @Override
>>> public Iterable<String> call(MatcherReleventData v1)
>>> throws Exception {
>>> List<String> values = new ArrayList<String>();
>>> HelperUtilities helper1 = new HelperUtilities();
>>> MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
>>> if(matchkeys.get_companyName() !=null){
>>> values.add(matchkeys.get_companyName());
>>> }
>>> if(matchkeys.get_phoneNumberr() !=null){
>>> values.add(matchkeys.get_phoneNumberr());
>>> }
>>> if(matchkeys.get_zipCode() !=null){
>>> values.add(matchkeys.get_zipCode());
>>> }
>>> if(matchkeys.getM_domain() !=null){
>>> values.add(matchkeys.getM_domain());
>>> }
>>>   return values;
>>> }
>>>  });
>>>  log.info("blocking RDD is"+blockingRdd.count());
>>> int count=0;
>>> log.info("Starting printing");
>>>   for (Tuple2<Long, String> entry : blockingRdd.collect()) {
>>>
>>>   log.info(entry._1() + ":" + entry._2());
>>>       count++;
>>>     }
>>>   log.info("total count"+count);
>>>  JavaPairRDD<Long,Integer>
>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>> Iterable<Integer>>(){
>>>
>>> @Override
>>> public Iterable<Integer> call(String v1) throws Exception {
>>> return ckdao.getSingelkeyresult(v1);
>>> }
>>>  }).distinct(32);
>>>  log.info("after hbase count is"+completeDataToprocess.count());
>>>  log.info("data for process"+completeDataToprocess.take(1));
>>>  JavaPairRDD<Long, Tuple2<Integer, Double>> withScore
>>> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>,
>>> Long, Tuple2<Integer, Double>>(){
>>>
>>> @Override
>>> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer>
>>> t)
>>> throws Exception {
>>> Scoring scoreObj = new Scoring();
>>> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
>>> dataMatchGlobal.getValue().get(t._1()));
>>> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(),
>>> score);
>>> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long,
>>> Tuple2<Integer,Double>>(t._1(), maptuple);
>>> return tuple;
>>> }
>>>  });
>>>  log.info("with score tuple is"+withScore.take(1));
>>>  JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD
>>> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>,
>>> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){
>>>
>>> @Override
>>> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1,
>>> Tuple2<Integer, Double> v2) throws Exception {
>>>  int res =v1._2().compareTo(v2._2());
>>> if(res >0){
>>>  Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(),
>>> v1._2());
>>> return result;
>>>  }
>>> else if(res<0){
>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
>>> v2._2());
>>> return result;
>>> }
>>> else{
>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
>>> v2._2());
>>> return result;
>>> }
>>>   }
>>>  });
>>>  log.info("max score RDD"+maxScoreRDD.take(10));
>>>
>>>  maxScoreRDD.foreach( new
>>> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){
>>>
>>> @Override
>>> public void call(Tuple2<Long, Tuple2<Integer, Double>> t)
>>> throws Exception {
>>> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
>>> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1()));
>>> //Set the score for better understanding of merge
>>> matchedData.setScore(t._2()._2());
>>> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id");
>>>  }
>>>  });
>>>  log.info("took " + (System.currentTimeMillis() - start) + " mills to
>>> run matcher");
>>>
>>>
>>>
>>>  }
>>>
>>>
>>> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>>>
>>>> Can you paste your complete code? Did you try repartioning/increasing
>>>> level of parallelism to speed up the processing. Since you have 16 cores,
>>>> and I'm assuming your 400k records isn't bigger than a 10G dataset.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <
>>>> gangele...@gmail.com> wrote:
>>>>
>>>>> I already checked and G is taking 1 secs for each task. is this too
>>>>> much? if yes how to avoid this?
>>>>>
>>>>>
>>>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com>
>>>>> wrote:
>>>>>
>>>>>> Open the driver ui and see which stage is taking time, you can look
>>>>>> whether its adding any GC time etc.
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele <
>>>>>> gangele...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi All I have below code whether distinct is running for more time.
>>>>>>>
>>>>>>> blockingRdd is the combination of <Long,String> and it will have
>>>>>>> 400K records
>>>>>>> JavaPairRDD<Long,Integer>
>>>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>>>>>> Iterable<Integer>>(){
>>>>>>>
>>>>>>> @Override
>>>>>>> public Iterable<Integer> call(String v1) throws Exception {
>>>>>>> return ckdao.getSingelkeyresult(v1);
>>>>>>> }
>>>>>>>  }).distinct(32);
>>>>>>>
>>>>>>> I am running distinct on 800K records and its taking 2 hours on 16
>>>>>>> cores and 20 GB RAM.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>

Reply via email to