Strange duplicates in data when scaling up
Issue was solved by clearing hashmap and hashset at the beginning of the call method. From: Jacob Maloney [mailto:jmalo...@conversantmedia.com] Sent: Thursday, October 16, 2014 5:09 PM To: user@spark.apache.org Subject: Strange duplicates in data when scaling up I have a flatmap function that shouldn't possibly emit duplicates and yet it does. The output of my function is a HashSet so the function itself cannot output duplicates and yet I see many copies of keys emmited from it (in one case up to 62). The curious thing is I can't get this to happen until I ramp up the size of the input lines to about 100,000. For example: (3587005221,[[(80530632,0.20824391739360665)], [(80530632,0.20824391739360665)]]) Will expand to (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) . . . (3587005221,(80530632,0.37312230565577803)) 62 total times If I run this line only as input I only get the one line of output as expected. It seems to be a scaling up issue. My code is as follows: JavaPairRDDLong,IterableIterableTuple2Integer,Double preAggData = indidKeyedJoinedData.groupByKey(); JavaPairRDDLong,Tuple2Integer,Double aggregatedData = preAggData.flatMapToPair(new AggregateLikeSims()); Where: static class AggregateLikeSims implements PairFlatMapFunctionTuple2Long,IterableIterableTuple2Integer,Double, Long,Tuple2Integer,Double{ HashSetTuple2Long, Tuple2Integer, Double output = new HashSetTuple2Long, Tuple2Integer, Double(); MapInteger,ListDouble intermediateMap = new HashMapInteger,ListDouble(); IteratorTuple2Integer,Double intIterator; Tuple2Integer,Double currentTuple; Double MAX_RECO_VALUE = 1.0; IteratorIterableTuple2Integer,Double itIterator; AccumulatorInteger accum; @Override public IterableTuple2Long, Tuple2Integer, Double call(Tuple2Long,IterableIterableTuple2Integer,Double inTuple){ itIterator = inTuple._2.iterator(); while(itIterator.hasNext()){ intIterator = itIterator.next().iterator(); while(intIterator.hasNext()){ currentTuple = intIterator.next(); if (intermediateMap.containsKey(currentTuple._1)){ intermediateMap.get(currentTuple._1).add(currentTuple._2); } else { ListDouble listOfDoubles = new ArrayListDouble(); listOfDoubles.add(currentTuple._2); intermediateMap.put(currentTuple._1, listOfDoubles); } } } IteratorMap.EntryInteger,ListDouble it = intermediateMap.entrySet().iterator(); while (it.hasNext()) { Map.EntryInteger,ListDouble pairs = it.next(); if (pairs.getValue().size() 1) { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),aggregate(pairs.getValue(); } else { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),pairs.getValue().get(0; } it.remove(); } return output; } private double aggregate(ListDouble simsList) { if (simsList == null) { return 0
Strange duplicates in data when scaling up
I have a flatmap function that shouldn't possibly emit duplicates and yet it does. The output of my function is a HashSet so the function itself cannot output duplicates and yet I see many copies of keys emmited from it (in one case up to 62). The curious thing is I can't get this to happen until I ramp up the size of the input lines to about 100,000. For example: (3587005221,[[(80530632,0.20824391739360665)], [(80530632,0.20824391739360665)]]) Will expand to (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) . . . (3587005221,(80530632,0.37312230565577803)) 62 total times If I run this line only as input I only get the one line of output as expected. It seems to be a scaling up issue. My code is as follows: JavaPairRDDLong,IterableIterableTuple2Integer,Double preAggData = indidKeyedJoinedData.groupByKey(); JavaPairRDDLong,Tuple2Integer,Double aggregatedData = preAggData.flatMapToPair(new AggregateLikeSims()); Where: static class AggregateLikeSims implements PairFlatMapFunctionTuple2Long,IterableIterableTuple2Integer,Double, Long,Tuple2Integer,Double{ HashSetTuple2Long, Tuple2Integer, Double output = new HashSetTuple2Long, Tuple2Integer, Double(); MapInteger,ListDouble intermediateMap = new HashMapInteger,ListDouble(); IteratorTuple2Integer,Double intIterator; Tuple2Integer,Double currentTuple; Double MAX_RECO_VALUE = 1.0; IteratorIterableTuple2Integer,Double itIterator; AccumulatorInteger accum; @Override public IterableTuple2Long, Tuple2Integer, Double call(Tuple2Long,IterableIterableTuple2Integer,Double inTuple){ itIterator = inTuple._2.iterator(); while(itIterator.hasNext()){ intIterator = itIterator.next().iterator(); while(intIterator.hasNext()){ currentTuple = intIterator.next(); if (intermediateMap.containsKey(currentTuple._1)){ intermediateMap.get(currentTuple._1).add(currentTuple._2); } else { ListDouble listOfDoubles = new ArrayListDouble(); listOfDoubles.add(currentTuple._2); intermediateMap.put(currentTuple._1, listOfDoubles); } } } IteratorMap.EntryInteger,ListDouble it = intermediateMap.entrySet().iterator(); while (it.hasNext()) { Map.EntryInteger,ListDouble pairs = it.next(); if (pairs.getValue().size() 1) { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),aggregate(pairs.getValue(); } else { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),pairs.getValue().get(0; } it.remove(); } return output; } private double aggregate(ListDouble simsList) { if (simsList == null) { return 0; } if (simsList.size() == 1) { return simsList.get(0); }
Issue with java spark broadcast
I'm trying to broadcast an accumulator I generated earlier in my app. However I get a nullpointer exception whenever I reference the value. // The start of my accumulator generation LookupKeyToIntMap keyToIntMapper = new LookupKeyToIntMap(); keyToIntMapper.setNumPartitions(intermediatePair.splits().size()); keyToIntMapper.setMapAccumulator(keyToIntMap); JavaRDDTuple2Integer,IterableLong intermediateIntsTuple = intermediatePair.mapPartitionsWithIndex(keyToIntMapper,false); JavaPairRDDInteger,IterableLong intermediatePairInts = JavaPairRDD.fromJavaRDD(intermediateIntsTuple); JavaPairRDDInteger,Tuple2Integer,Integer sims = intermediatePairInts.mapValues(new SelfSim()); // I force the RDD to evaluate so to avoid laziness issues MapInteger,Tuple2Integer,Integer simsMap = sims.collectAsMap(); // Broadcast the map // If I include a print statement here on the accumulator I can print the map out succesfully broadcastVar = ctx.broadcast(keyToIntMap.value()); // Here I try to access the broadcasted map JavaPairRDDInteger,Long indidIntKeyPair = indidKeyPairFiltered.mapToPair(new PairFunctionTuple2String,Long, Integer, Long(){ @Override public Tuple2Integer,Long call(Tuple2String,Long keyVal) throws Exception{ Integer outInt = broadcastVar.value().inverse().get(keyVal._1); return new Tuple2Integer,Long(outInt,keyVal._2); } }); This works when I run it locally just fine but when I move it to a cluster environment it throws nullpointerexceptions. My questions is why can't I access this map? And what do I have to do to make it accessible. Thanks, Jacob -Original Message- From: user-h...@spark.apache.org [mailto:user-h...@spark.apache.org] Sent: Friday, October 10, 2014 4:02 PM To: Jacob Maloney Subject: FAQ for user@spark.apache.org Hi! This is the ezmlm program. I'm managing the user@spark.apache.org mailing list. FAQ - Frequently asked questions of the user@spark.apache.org list. None available yet. --- Administrative commands for the user list --- I can handle administrative requests automatically. Please do not send them to the list address! Instead, send your message to the correct command address: To subscribe to the list, send a message to: user-subscr...@spark.apache.org To remove your address from the list, send a message to: user-unsubscr...@spark.apache.org Send mail to the following for info and FAQ for this list: user-i...@spark.apache.org user-...@spark.apache.org Similar addresses exist for the digest list: user-digest-subscr...@spark.apache.org user-digest-unsubscr...@spark.apache.org To get messages 123 through 145 (a maximum of 100 per request), mail: user-get.123_...@spark.apache.org To get an index with subject and author for messages 123-456 , mail: user-index.123_...@spark.apache.org They are always returned as sets of 100, max 2000 per request, so you'll actually get 100-499. To receive all messages with the same subject as message 12345, send a short message to: user-thread.12...@spark.apache.org The messages should contain one line or word of text to avoid being treated as sp@m, but I will ignore their content. Only the ADDRESS you send to is important. You can start a subscription for an alternate address, for example john@host.domain, just add a hyphen and your address (with '=' instead of '@') after the command word: user-subscribe-john=host.dom...@spark.apache.org To stop subscription for this address, mail: user-unsubscribe-john=host.dom...@spark.apache.org In both cases, I'll send a confirmation message to that address. When you receive it, simply reply to it to complete your subscription. If despite following these instructions, you do not get the desired results, please contact my owner at user-ow...@spark.apache.org. Please be patient, my owner is a lot slower than I am ;-) --- Enclosed is a copy of the request I received. Return-Path: jmalo...@conversantmedia.com Received: (qmail 26980 invoked by uid 99); 10 Oct 2014 21:02:15 - Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:02:15 + X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,SPF_PASS,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of jmalo...@conversantmedia.com designates 69.8.121.83 as permitted sender) Received: from [69.8.121.83] (HELO ord-smtp.vclk.net) (69.8.121.83) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:01:49