I made 7000 tasks in mapTopair and in distinct also I made same number of tasks. Still lots of shuffle read and write is happening due to application running for much longer time. Any idea?
On 17 April 2015 at 11:55, Akhil Das <ak...@sigmoidanalytics.com> wrote: > How many tasks are you seeing in your mapToPair stage? Is it 7000? then i > suggest you giving a number similar/close to 7000 in your .distinct call, > what is happening in your case is that, you are repartitioning your data to > a smaller number (32) which would put a lot of load on processing i > believe, you can try increasing it. > > Thanks > Best Regards > > On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele <gangele...@gmail.com> > wrote: > >> Akhil, any thought on this? >> >> On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com> >> wrote: >> >>> No I did not tried the partitioning below is the full code >>> >>> public static void matchAndMerge(JavaRDD<VendorRecord> >>> matchRdd,JavaSparkContext jsc) throws IOException{ >>> long start = System.currentTimeMillis(); >>> JavaPairRDD<Long, MatcherReleventData> RddForMarch >>> =matchRdd.zipWithIndex().mapToPair(new >>> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() { >>> >>> @Override >>> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long> >>> t) >>> throws Exception { >>> MatcherReleventData matcherData = new MatcherReleventData(); >>> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long, >>> MatcherReleventData>(t._2, >>> matcherData.convertVendorDataToMatcherData(t._1)); >>> return tuple; >>> } >>> >>> }).cache(); >>> log.info("after index"+RddForMarch.take(1)); >>> Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap(); >>> Map<Long, MatcherReleventData> matchData = new HashMap<Long, >>> MatcherReleventData>(tmp); >>> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal = >>> jsc.broadcast(matchData); >>> >>> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new >>> Function<MatcherReleventData, Iterable<String>>(){ >>> >>> @Override >>> public Iterable<String> call(MatcherReleventData v1) >>> throws Exception { >>> List<String> values = new ArrayList<String>(); >>> HelperUtilities helper1 = new HelperUtilities(); >>> MatcherKeys matchkeys=helper1.getBlockinkeys(v1); >>> if(matchkeys.get_companyName() !=null){ >>> values.add(matchkeys.get_companyName()); >>> } >>> if(matchkeys.get_phoneNumberr() !=null){ >>> values.add(matchkeys.get_phoneNumberr()); >>> } >>> if(matchkeys.get_zipCode() !=null){ >>> values.add(matchkeys.get_zipCode()); >>> } >>> if(matchkeys.getM_domain() !=null){ >>> values.add(matchkeys.getM_domain()); >>> } >>> return values; >>> } >>> }); >>> log.info("blocking RDD is"+blockingRdd.count()); >>> int count=0; >>> log.info("Starting printing"); >>> for (Tuple2<Long, String> entry : blockingRdd.collect()) { >>> >>> log.info(entry._1() + ":" + entry._2()); >>> count++; >>> } >>> log.info("total count"+count); >>> JavaPairRDD<Long,Integer> >>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>> Iterable<Integer>>(){ >>> >>> @Override >>> public Iterable<Integer> call(String v1) throws Exception { >>> return ckdao.getSingelkeyresult(v1); >>> } >>> }).distinct(32); >>> log.info("after hbase count is"+completeDataToprocess.count()); >>> log.info("data for process"+completeDataToprocess.take(1)); >>> JavaPairRDD<Long, Tuple2<Integer, Double>> withScore >>> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>, >>> Long, Tuple2<Integer, Double>>(){ >>> >>> @Override >>> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> >>> t) >>> throws Exception { >>> Scoring scoreObj = new Scoring(); >>> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), >>> dataMatchGlobal.getValue().get(t._1())); >>> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(), >>> score); >>> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long, >>> Tuple2<Integer,Double>>(t._1(), maptuple); >>> return tuple; >>> } >>> }); >>> log.info("with score tuple is"+withScore.take(1)); >>> JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD >>> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>, >>> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){ >>> >>> @Override >>> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1, >>> Tuple2<Integer, Double> v2) throws Exception { >>> int res =v1._2().compareTo(v2._2()); >>> if(res >0){ >>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(), >>> v1._2()); >>> return result; >>> } >>> else if(res<0){ >>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), >>> v2._2()); >>> return result; >>> } >>> else{ >>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), >>> v2._2()); >>> return result; >>> } >>> } >>> }); >>> log.info("max score RDD"+maxScoreRDD.take(10)); >>> >>> maxScoreRDD.foreach( new >>> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){ >>> >>> @Override >>> public void call(Tuple2<Long, Tuple2<Integer, Double>> t) >>> throws Exception { >>> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); >>> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1())); >>> //Set the score for better understanding of merge >>> matchedData.setScore(t._2()._2()); >>> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"); >>> } >>> }); >>> log.info("took " + (System.currentTimeMillis() - start) + " mills to >>> run matcher"); >>> >>> >>> >>> } >>> >>> >>> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote: >>> >>>> Can you paste your complete code? Did you try repartioning/increasing >>>> level of parallelism to speed up the processing. Since you have 16 cores, >>>> and I'm assuming your 400k records isn't bigger than a 10G dataset. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele < >>>> gangele...@gmail.com> wrote: >>>> >>>>> I already checked and G is taking 1 secs for each task. is this too >>>>> much? if yes how to avoid this? >>>>> >>>>> >>>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> >>>>> wrote: >>>>> >>>>>> Open the driver ui and see which stage is taking time, you can look >>>>>> whether its adding any GC time etc. >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele < >>>>>> gangele...@gmail.com> wrote: >>>>>> >>>>>>> Hi All I have below code whether distinct is running for more time. >>>>>>> >>>>>>> blockingRdd is the combination of <Long,String> and it will have >>>>>>> 400K records >>>>>>> JavaPairRDD<Long,Integer> >>>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>>>>>> Iterable<Integer>>(){ >>>>>>> >>>>>>> @Override >>>>>>> public Iterable<Integer> call(String v1) throws Exception { >>>>>>> return ckdao.getSingelkeyresult(v1); >>>>>>> } >>>>>>> }).distinct(32); >>>>>>> >>>>>>> I am running distinct on 800K records and its taking 2 hours on 16 >>>>>>> cores and 20 GB RAM. >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> >>> >> >> >> >> >