Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com> wrote:
> No I did not tried the partitioning below is the full code > > public static void matchAndMerge(JavaRDD<VendorRecord> > matchRdd,JavaSparkContext jsc) throws IOException{ > long start = System.currentTimeMillis(); > JavaPairRDD<Long, MatcherReleventData> RddForMarch > =matchRdd.zipWithIndex().mapToPair(new > PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() { > > @Override > public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long> t) > throws Exception { > MatcherReleventData matcherData = new MatcherReleventData(); > Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long, > MatcherReleventData>(t._2, > matcherData.convertVendorDataToMatcherData(t._1)); > return tuple; > } > > }).cache(); > log.info("after index"+RddForMarch.take(1)); > Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap(); > Map<Long, MatcherReleventData> matchData = new HashMap<Long, > MatcherReleventData>(tmp); > final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal = > jsc.broadcast(matchData); > > JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new > Function<MatcherReleventData, Iterable<String>>(){ > > @Override > public Iterable<String> call(MatcherReleventData v1) > throws Exception { > List<String> values = new ArrayList<String>(); > HelperUtilities helper1 = new HelperUtilities(); > MatcherKeys matchkeys=helper1.getBlockinkeys(v1); > if(matchkeys.get_companyName() !=null){ > values.add(matchkeys.get_companyName()); > } > if(matchkeys.get_phoneNumberr() !=null){ > values.add(matchkeys.get_phoneNumberr()); > } > if(matchkeys.get_zipCode() !=null){ > values.add(matchkeys.get_zipCode()); > } > if(matchkeys.getM_domain() !=null){ > values.add(matchkeys.getM_domain()); > } > return values; > } > }); > log.info("blocking RDD is"+blockingRdd.count()); > int count=0; > log.info("Starting printing"); > for (Tuple2<Long, String> entry : blockingRdd.collect()) { > > log.info(entry._1() + ":" + entry._2()); > count++; > } > log.info("total count"+count); > JavaPairRDD<Long,Integer> > completeDataToprocess=blockingRdd.flatMapValues( new Function<String, > Iterable<Integer>>(){ > > @Override > public Iterable<Integer> call(String v1) throws Exception { > return ckdao.getSingelkeyresult(v1); > } > }).distinct(32); > log.info("after hbase count is"+completeDataToprocess.count()); > log.info("data for process"+completeDataToprocess.take(1)); > JavaPairRDD<Long, Tuple2<Integer, Double>> withScore > =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>, > Long, Tuple2<Integer, Double>>(){ > > @Override > public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> t) > throws Exception { > Scoring scoreObj = new Scoring(); > double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), > dataMatchGlobal.getValue().get(t._1())); > Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(), > score); > Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long, > Tuple2<Integer,Double>>(t._1(), maptuple); > return tuple; > } > }); > log.info("with score tuple is"+withScore.take(1)); > JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD > =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>, > Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){ > > @Override > public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1, > Tuple2<Integer, Double> v2) throws Exception { > int res =v1._2().compareTo(v2._2()); > if(res >0){ > Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(), > v1._2()); > return result; > } > else if(res<0){ > Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), > v2._2()); > return result; > } > else{ > Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), > v2._2()); > return result; > } > } > }); > log.info("max score RDD"+maxScoreRDD.take(10)); > > maxScoreRDD.foreach( new > VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){ > > @Override > public void call(Tuple2<Long, Tuple2<Integer, Double>> t) > throws Exception { > MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); > log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1())); > //Set the score for better understanding of merge > matchedData.setScore(t._2()._2()); > vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"); > } > }); > log.info("took " + (System.currentTimeMillis() - start) + " mills to run > matcher"); > > > > } > > > On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote: > >> Can you paste your complete code? Did you try repartioning/increasing >> level of parallelism to speed up the processing. Since you have 16 cores, >> and I'm assuming your 400k records isn't bigger than a 10G dataset. >> >> Thanks >> Best Regards >> >> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <gangele...@gmail.com >> > wrote: >> >>> I already checked and G is taking 1 secs for each task. is this too >>> much? if yes how to avoid this? >>> >>> >>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> wrote: >>> >>>> Open the driver ui and see which stage is taking time, you can look >>>> whether its adding any GC time etc. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele < >>>> gangele...@gmail.com> wrote: >>>> >>>>> Hi All I have below code whether distinct is running for more time. >>>>> >>>>> blockingRdd is the combination of <Long,String> and it will have 400K >>>>> records >>>>> JavaPairRDD<Long,Integer> >>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>>>> Iterable<Integer>>(){ >>>>> >>>>> @Override >>>>> public Iterable<Integer> call(String v1) throws Exception { >>>>> return ckdao.getSingelkeyresult(v1); >>>>> } >>>>> }).distinct(32); >>>>> >>>>> I am running distinct on 800K records and its taking 2 hours on 16 >>>>> cores and 20 GB RAM. >>>>> >>>> >>>> >>> >>> >>> >>> >> > > > >