Strange duplicates in data when scaling up

2014-10-17 Thread Jacob Maloney
Issue was solved by clearing hashmap and hashset at the beginning of the call 
method.


From: Jacob Maloney [mailto:jmalo...@conversantmedia.com]
Sent: Thursday, October 16, 2014 5:09 PM
To: user@spark.apache.org
Subject: Strange duplicates in data when scaling up

I have a flatmap function that shouldn't possibly emit duplicates and yet it 
does. The output of my function is a HashSet so the function itself cannot 
output duplicates and yet I see many copies of keys emmited from it (in one 
case up to 62). The curious thing is I can't get this to happen until I ramp up 
the size of the input lines to about 100,000. For example:
(3587005221,[[(80530632,0.20824391739360665)], 
[(80530632,0.20824391739360665)]])

Will expand to
(3587005221,(80530632,0.37312230565577803))
(3587005221,(80530632,0.37312230565577803))
(3587005221,(80530632,0.37312230565577803))
.
.
.
(3587005221,(80530632,0.37312230565577803))
62 total times

If I run this line only as input I only get the one line of output as expected. 
It seems to be a scaling up issue.

My code is as follows:
JavaPairRDDLong,IterableIterableTuple2Integer,Double preAggData = 
indidKeyedJoinedData.groupByKey();

JavaPairRDDLong,Tuple2Integer,Double aggregatedData = 
preAggData.flatMapToPair(new AggregateLikeSims());

Where:
static class AggregateLikeSims implements 
PairFlatMapFunctionTuple2Long,IterableIterableTuple2Integer,Double, 
Long,Tuple2Integer,Double{
HashSetTuple2Long, Tuple2Integer, Double 
output = new HashSetTuple2Long, Tuple2Integer, Double();
MapInteger,ListDouble intermediateMap = new 
HashMapInteger,ListDouble();
IteratorTuple2Integer,Double intIterator;
Tuple2Integer,Double currentTuple;
Double MAX_RECO_VALUE = 1.0;
IteratorIterableTuple2Integer,Double 
itIterator;
AccumulatorInteger accum;

@Override
public IterableTuple2Long, Tuple2Integer, 
Double call(Tuple2Long,IterableIterableTuple2Integer,Double inTuple){
itIterator = 
inTuple._2.iterator();

while(itIterator.hasNext()){
intIterator = 
itIterator.next().iterator();

while(intIterator.hasNext()){

currentTuple = intIterator.next();

if (intermediateMap.containsKey(currentTuple._1)){

intermediateMap.get(currentTuple._1).add(currentTuple._2);

} else {

ListDouble listOfDoubles = new ArrayListDouble();

listOfDoubles.add(currentTuple._2);

intermediateMap.put(currentTuple._1, listOfDoubles);

}
}
}


IteratorMap.EntryInteger,ListDouble it = 
intermediateMap.entrySet().iterator();
while (it.hasNext()) {
Map.EntryInteger,ListDouble pairs = 
it.next();
if (pairs.getValue().size()  1) {
output.add(new Tuple2Long, 
Tuple2Integer, Double(inTuple._1,new 
Tuple2Integer,Double(pairs.getKey(),aggregate(pairs.getValue();
} else {
output.add(new Tuple2Long, 
Tuple2Integer, Double(inTuple._1,new 
Tuple2Integer,Double(pairs.getKey(),pairs.getValue().get(0;
}
it.remove();
}

return output;
}

private double aggregate(ListDouble simsList) 
{
if (simsList == null) {
return 0

Strange duplicates in data when scaling up

2014-10-16 Thread Jacob Maloney
I have a flatmap function that shouldn't possibly emit duplicates and yet it 
does. The output of my function is a HashSet so the function itself cannot 
output duplicates and yet I see many copies of keys emmited from it (in one 
case up to 62). The curious thing is I can't get this to happen until I ramp up 
the size of the input lines to about 100,000. For example:
(3587005221,[[(80530632,0.20824391739360665)], 
[(80530632,0.20824391739360665)]])

Will expand to
(3587005221,(80530632,0.37312230565577803))
(3587005221,(80530632,0.37312230565577803))
(3587005221,(80530632,0.37312230565577803))
.
.
.
(3587005221,(80530632,0.37312230565577803))
62 total times

If I run this line only as input I only get the one line of output as expected. 
It seems to be a scaling up issue.

My code is as follows:
JavaPairRDDLong,IterableIterableTuple2Integer,Double preAggData = 
indidKeyedJoinedData.groupByKey();

JavaPairRDDLong,Tuple2Integer,Double aggregatedData = 
preAggData.flatMapToPair(new AggregateLikeSims());

Where:
static class AggregateLikeSims implements 
PairFlatMapFunctionTuple2Long,IterableIterableTuple2Integer,Double, 
Long,Tuple2Integer,Double{
HashSetTuple2Long, Tuple2Integer, Double 
output = new HashSetTuple2Long, Tuple2Integer, Double();
MapInteger,ListDouble intermediateMap = new 
HashMapInteger,ListDouble();
IteratorTuple2Integer,Double intIterator;
Tuple2Integer,Double currentTuple;
Double MAX_RECO_VALUE = 1.0;
IteratorIterableTuple2Integer,Double 
itIterator;
AccumulatorInteger accum;

@Override
public IterableTuple2Long, Tuple2Integer, 
Double call(Tuple2Long,IterableIterableTuple2Integer,Double inTuple){
itIterator = 
inTuple._2.iterator();

while(itIterator.hasNext()){
intIterator = 
itIterator.next().iterator();

while(intIterator.hasNext()){

currentTuple = intIterator.next();

if (intermediateMap.containsKey(currentTuple._1)){

intermediateMap.get(currentTuple._1).add(currentTuple._2);

} else {

ListDouble listOfDoubles = new ArrayListDouble();

listOfDoubles.add(currentTuple._2);

intermediateMap.put(currentTuple._1, listOfDoubles);

}
}
}


IteratorMap.EntryInteger,ListDouble it = 
intermediateMap.entrySet().iterator();
while (it.hasNext()) {
Map.EntryInteger,ListDouble pairs = 
it.next();
if (pairs.getValue().size()  1) {
output.add(new Tuple2Long, 
Tuple2Integer, Double(inTuple._1,new 
Tuple2Integer,Double(pairs.getKey(),aggregate(pairs.getValue();
} else {
output.add(new Tuple2Long, 
Tuple2Integer, Double(inTuple._1,new 
Tuple2Integer,Double(pairs.getKey(),pairs.getValue().get(0;
}
it.remove();
}

return output;
}

private double aggregate(ListDouble simsList) 
{
if (simsList == null) {
return 0;
}
if (simsList.size() == 1) {
return 
simsList.get(0);
}


Issue with java spark broadcast

2014-10-10 Thread Jacob Maloney
I'm trying to broadcast an accumulator I generated earlier in my app. However I 
get a nullpointer exception whenever I reference the value.

// The start of my accumulator generation
LookupKeyToIntMap keyToIntMapper = new LookupKeyToIntMap();

keyToIntMapper.setNumPartitions(intermediatePair.splits().size());
keyToIntMapper.setMapAccumulator(keyToIntMap);
JavaRDDTuple2Integer,IterableLong intermediateIntsTuple = 
intermediatePair.mapPartitionsWithIndex(keyToIntMapper,false);

JavaPairRDDInteger,IterableLong intermediatePairInts = 
JavaPairRDD.fromJavaRDD(intermediateIntsTuple);

JavaPairRDDInteger,Tuple2Integer,Integer sims = 
intermediatePairInts.mapValues(new SelfSim());

// I force the RDD to evaluate so to avoid laziness issues
MapInteger,Tuple2Integer,Integer simsMap = 
sims.collectAsMap();

// Broadcast the map
// If I include a print statement here on the accumulator I can 
print the map out succesfully
broadcastVar = ctx.broadcast(keyToIntMap.value());

//  Here I try to access the broadcasted map
JavaPairRDDInteger,Long indidIntKeyPair = 
indidKeyPairFiltered.mapToPair(new PairFunctionTuple2String,Long, Integer, 
Long(){  
@Override
public Tuple2Integer,Long call(Tuple2String,Long 
keyVal) throws Exception{
Integer outInt = 
broadcastVar.value().inverse().get(keyVal._1);
return new 
Tuple2Integer,Long(outInt,keyVal._2);
}
});

This works when I run it locally just fine but when I move it to a cluster 
environment it throws nullpointerexceptions. My questions is why can't I access 
this map? And what do I have to do to make it accessible.

Thanks,

Jacob

-Original Message-
From: user-h...@spark.apache.org [mailto:user-h...@spark.apache.org] 
Sent: Friday, October 10, 2014 4:02 PM
To: Jacob Maloney
Subject: FAQ for user@spark.apache.org

Hi! This is the ezmlm program. I'm managing the user@spark.apache.org mailing 
list.

FAQ - Frequently asked questions of the user@spark.apache.org list.

None available yet.


--- Administrative commands for the user list ---

I can handle administrative requests automatically. Please do not send them to 
the list address! Instead, send your message to the correct command address:

To subscribe to the list, send a message to:
   user-subscr...@spark.apache.org

To remove your address from the list, send a message to:
   user-unsubscr...@spark.apache.org

Send mail to the following for info and FAQ for this list:
   user-i...@spark.apache.org
   user-...@spark.apache.org

Similar addresses exist for the digest list:
   user-digest-subscr...@spark.apache.org
   user-digest-unsubscr...@spark.apache.org

To get messages 123 through 145 (a maximum of 100 per request), mail:
   user-get.123_...@spark.apache.org

To get an index with subject and author for messages 123-456 , mail:
   user-index.123_...@spark.apache.org

They are always returned as sets of 100, max 2000 per request, so you'll 
actually get 100-499.

To receive all messages with the same subject as message 12345, send a short 
message to:
   user-thread.12...@spark.apache.org

The messages should contain one line or word of text to avoid being treated as 
sp@m, but I will ignore their content.
Only the ADDRESS you send to is important.

You can start a subscription for an alternate address, for example 
john@host.domain, just add a hyphen and your address (with '=' instead of 
'@') after the command word:
user-subscribe-john=host.dom...@spark.apache.org

To stop subscription for this address, mail:
user-unsubscribe-john=host.dom...@spark.apache.org

In both cases, I'll send a confirmation message to that address. When you 
receive it, simply reply to it to complete your subscription.

If despite following these instructions, you do not get the desired results, 
please contact my owner at user-ow...@spark.apache.org. Please be patient, my 
owner is a lot slower than I am ;-)

--- Enclosed is a copy of the request I received.

Return-Path: jmalo...@conversantmedia.com
Received: (qmail 26980 invoked by uid 99); 10 Oct 2014 21:02:15 -
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:02:15 +
X-ASF-Spam-Status: No, hits=2.2 required=5.0
tests=HTML_MESSAGE,SPF_PASS,T_FILL_THIS_FORM_SHORT
X-Spam-Check-By: apache.org
Received-SPF: pass (nike.apache.org: domain of jmalo...@conversantmedia.com 
designates 69.8.121.83 as permitted sender)
Received: from [69.8.121.83] (HELO ord-smtp.vclk.net) (69.8.121.83)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:01:49