Akhil, any thought on this?

On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com> wrote:

> No I did not tried the partitioning below is the full code
>
> public static  void  matchAndMerge(JavaRDD<VendorRecord>
> matchRdd,JavaSparkContext jsc) throws IOException{
>  long start = System.currentTimeMillis();
>   JavaPairRDD<Long, MatcherReleventData> RddForMarch
> =matchRdd.zipWithIndex().mapToPair(new
> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() {
>
> @Override
> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long> t)
> throws Exception {
> MatcherReleventData matcherData = new MatcherReleventData();
> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long,
> MatcherReleventData>(t._2,
> matcherData.convertVendorDataToMatcherData(t._1));
>  return tuple;
> }
>
> }).cache();
>  log.info("after index"+RddForMarch.take(1));
>  Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap();
> Map<Long, MatcherReleventData> matchData = new HashMap<Long,
> MatcherReleventData>(tmp);
> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal =
> jsc.broadcast(matchData);
>
> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new
> Function<MatcherReleventData, Iterable<String>>(){
>
> @Override
> public Iterable<String> call(MatcherReleventData v1)
> throws Exception {
> List<String> values = new ArrayList<String>();
> HelperUtilities helper1 = new HelperUtilities();
> MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
> if(matchkeys.get_companyName() !=null){
> values.add(matchkeys.get_companyName());
> }
> if(matchkeys.get_phoneNumberr() !=null){
> values.add(matchkeys.get_phoneNumberr());
> }
> if(matchkeys.get_zipCode() !=null){
> values.add(matchkeys.get_zipCode());
> }
> if(matchkeys.getM_domain() !=null){
> values.add(matchkeys.getM_domain());
> }
>   return values;
> }
>  });
>  log.info("blocking RDD is"+blockingRdd.count());
> int count=0;
> log.info("Starting printing");
>   for (Tuple2<Long, String> entry : blockingRdd.collect()) {
>
>   log.info(entry._1() + ":" + entry._2());
>       count++;
>     }
>   log.info("total count"+count);
>  JavaPairRDD<Long,Integer>
> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
> Iterable<Integer>>(){
>
> @Override
> public Iterable<Integer> call(String v1) throws Exception {
> return ckdao.getSingelkeyresult(v1);
> }
>  }).distinct(32);
>  log.info("after hbase count is"+completeDataToprocess.count());
>  log.info("data for process"+completeDataToprocess.take(1));
>  JavaPairRDD<Long, Tuple2<Integer, Double>> withScore
> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>,
> Long, Tuple2<Integer, Double>>(){
>
> @Override
> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> t)
> throws Exception {
> Scoring scoreObj = new Scoring();
> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
> dataMatchGlobal.getValue().get(t._1()));
> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(),
> score);
> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long,
> Tuple2<Integer,Double>>(t._1(), maptuple);
> return tuple;
> }
>  });
>  log.info("with score tuple is"+withScore.take(1));
>  JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD
> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>,
> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){
>
> @Override
> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1,
> Tuple2<Integer, Double> v2) throws Exception {
>  int res =v1._2().compareTo(v2._2());
> if(res >0){
>  Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(),
> v1._2());
> return result;
>  }
> else if(res<0){
> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
> v2._2());
> return result;
> }
> else{
> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
> v2._2());
> return result;
> }
>   }
>  });
>  log.info("max score RDD"+maxScoreRDD.take(10));
>
>  maxScoreRDD.foreach( new
> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){
>
> @Override
> public void call(Tuple2<Long, Tuple2<Integer, Double>> t)
> throws Exception {
> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1()));
> //Set the score for better understanding of merge
> matchedData.setScore(t._2()._2());
> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id");
>  }
>  });
>  log.info("took " + (System.currentTimeMillis() - start) + " mills to run
> matcher");
>
>
>
>  }
>
>
> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>
>> Can you paste your complete code? Did you try repartioning/increasing
>> level of parallelism to speed up the processing. Since you have 16 cores,
>> and I'm assuming your 400k records isn't bigger than a 10G dataset.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <gangele...@gmail.com
>> > wrote:
>>
>>> I already checked and G is taking 1 secs for each task. is this too
>>> much? if yes how to avoid this?
>>>
>>>
>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>>>
>>>> Open the driver ui and see which stage is taking time, you can look
>>>> whether its adding any GC time etc.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele <
>>>> gangele...@gmail.com> wrote:
>>>>
>>>>> Hi All I have below code whether distinct is running for more time.
>>>>>
>>>>> blockingRdd is the combination of <Long,String> and it will have 400K
>>>>> records
>>>>> JavaPairRDD<Long,Integer>
>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>>>> Iterable<Integer>>(){
>>>>>
>>>>> @Override
>>>>> public Iterable<Integer> call(String v1) throws Exception {
>>>>> return ckdao.getSingelkeyresult(v1);
>>>>> }
>>>>>  }).distinct(32);
>>>>>
>>>>> I am running distinct on 800K records and its taking 2 hours on 16
>>>>> cores and 20 GB RAM.
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>
>
>
>

Reply via email to