How many tasks are you seeing in your mapToPair stage? Is it 7000? then i suggest you giving a number similar/close to 7000 in your .distinct call, what is happening in your case is that, you are repartitioning your data to a smaller number (32) which would put a lot of load on processing i believe, you can try increasing it.
Thanks Best Regards On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele <gangele...@gmail.com> wrote: > Akhil, any thought on this? > > On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com> wrote: > >> No I did not tried the partitioning below is the full code >> >> public static void matchAndMerge(JavaRDD<VendorRecord> >> matchRdd,JavaSparkContext jsc) throws IOException{ >> long start = System.currentTimeMillis(); >> JavaPairRDD<Long, MatcherReleventData> RddForMarch >> =matchRdd.zipWithIndex().mapToPair(new >> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() { >> >> @Override >> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long> >> t) >> throws Exception { >> MatcherReleventData matcherData = new MatcherReleventData(); >> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long, >> MatcherReleventData>(t._2, >> matcherData.convertVendorDataToMatcherData(t._1)); >> return tuple; >> } >> >> }).cache(); >> log.info("after index"+RddForMarch.take(1)); >> Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap(); >> Map<Long, MatcherReleventData> matchData = new HashMap<Long, >> MatcherReleventData>(tmp); >> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal = >> jsc.broadcast(matchData); >> >> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new >> Function<MatcherReleventData, Iterable<String>>(){ >> >> @Override >> public Iterable<String> call(MatcherReleventData v1) >> throws Exception { >> List<String> values = new ArrayList<String>(); >> HelperUtilities helper1 = new HelperUtilities(); >> MatcherKeys matchkeys=helper1.getBlockinkeys(v1); >> if(matchkeys.get_companyName() !=null){ >> values.add(matchkeys.get_companyName()); >> } >> if(matchkeys.get_phoneNumberr() !=null){ >> values.add(matchkeys.get_phoneNumberr()); >> } >> if(matchkeys.get_zipCode() !=null){ >> values.add(matchkeys.get_zipCode()); >> } >> if(matchkeys.getM_domain() !=null){ >> values.add(matchkeys.getM_domain()); >> } >> return values; >> } >> }); >> log.info("blocking RDD is"+blockingRdd.count()); >> int count=0; >> log.info("Starting printing"); >> for (Tuple2<Long, String> entry : blockingRdd.collect()) { >> >> log.info(entry._1() + ":" + entry._2()); >> count++; >> } >> log.info("total count"+count); >> JavaPairRDD<Long,Integer> >> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >> Iterable<Integer>>(){ >> >> @Override >> public Iterable<Integer> call(String v1) throws Exception { >> return ckdao.getSingelkeyresult(v1); >> } >> }).distinct(32); >> log.info("after hbase count is"+completeDataToprocess.count()); >> log.info("data for process"+completeDataToprocess.take(1)); >> JavaPairRDD<Long, Tuple2<Integer, Double>> withScore >> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>, >> Long, Tuple2<Integer, Double>>(){ >> >> @Override >> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> t) >> throws Exception { >> Scoring scoreObj = new Scoring(); >> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), >> dataMatchGlobal.getValue().get(t._1())); >> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(), >> score); >> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long, >> Tuple2<Integer,Double>>(t._1(), maptuple); >> return tuple; >> } >> }); >> log.info("with score tuple is"+withScore.take(1)); >> JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD >> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>, >> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){ >> >> @Override >> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1, >> Tuple2<Integer, Double> v2) throws Exception { >> int res =v1._2().compareTo(v2._2()); >> if(res >0){ >> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(), >> v1._2()); >> return result; >> } >> else if(res<0){ >> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), >> v2._2()); >> return result; >> } >> else{ >> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), >> v2._2()); >> return result; >> } >> } >> }); >> log.info("max score RDD"+maxScoreRDD.take(10)); >> >> maxScoreRDD.foreach( new >> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){ >> >> @Override >> public void call(Tuple2<Long, Tuple2<Integer, Double>> t) >> throws Exception { >> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); >> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1())); >> //Set the score for better understanding of merge >> matchedData.setScore(t._2()._2()); >> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"); >> } >> }); >> log.info("took " + (System.currentTimeMillis() - start) + " mills to >> run matcher"); >> >> >> >> } >> >> >> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote: >> >>> Can you paste your complete code? Did you try repartioning/increasing >>> level of parallelism to speed up the processing. Since you have 16 cores, >>> and I'm assuming your 400k records isn't bigger than a 10G dataset. >>> >>> Thanks >>> Best Regards >>> >>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele < >>> gangele...@gmail.com> wrote: >>> >>>> I already checked and G is taking 1 secs for each task. is this too >>>> much? if yes how to avoid this? >>>> >>>> >>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Open the driver ui and see which stage is taking time, you can look >>>>> whether its adding any GC time etc. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele < >>>>> gangele...@gmail.com> wrote: >>>>> >>>>>> Hi All I have below code whether distinct is running for more time. >>>>>> >>>>>> blockingRdd is the combination of <Long,String> and it will have 400K >>>>>> records >>>>>> JavaPairRDD<Long,Integer> >>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>>>>> Iterable<Integer>>(){ >>>>>> >>>>>> @Override >>>>>> public Iterable<Integer> call(String v1) throws Exception { >>>>>> return ckdao.getSingelkeyresult(v1); >>>>>> } >>>>>> }).distinct(32); >>>>>> >>>>>> I am running distinct on 800K records and its taking 2 hours on 16 >>>>>> cores and 20 GB RAM. >>>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> >>> >> >> >> >> > > > >