Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process.
I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent. On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > Hi Michael, > > I'm not sure I fully understood your question, but I think RDD.aggregate > can be helpful in your case. You can see it as a more general version of > fold. > > Cheng > > > > On 10/16/14 11:15 PM, Michael Misiewicz wrote: > > Hi, > > I'm working on a problem where I'd like to sum items in an RDD *in order > (*approximately*)*. I am currently trying to implement this using a fold, > but I'm having some issues because the sorting key of my data is not the > same as the folding key for my data. I have data that looks like this: > > user_id, transaction_timestamp, transaction_amount > > And I'm interested in doing a foldByKey on user_id to sum transaction > amounts - taking care to note approximately when a user surpasses a total > transaction threshold. I'm using RangePartitioner to make sure that data > is ordered sequentially between partitions, and I'd also make sure that > data is sorted within partitions, though I'm not sure how to do this > exactly (I was going to look at the code for sortByKey to figure this out > - I believe sorting in place in a mapPartitions should work). What do you > think about the approach? Here's some sample code that demonstrates what > I'm thinking: > > def myFold(V1:Float, V2:Float) : Float = { > val partialSum = V1 + V2 > if (partialSum >= 500) { > // make a note of it, do things > } > return partialSum > } > > val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data > l = x.split() > (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, > transaction_timestamp:long, transaction_amount:float > } > val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) // rearrange > to make timestamp the key (for sorting), convert to PairRDD > val sortedByTimestamp = keyByTimestamp.sortByKey() > val partitionedByTimestamp = sortedByTimestamp.partitionBy( > new org.apache.spark.RangePartitioner(partitions=500, > rdd=sortedByTimestamp)).persist() > // By this point, the RDD should be sorted and partitioned according to > the timestamp. However, I need to now make user_id the key, > // because the output must be per user. At this point, since I change the > keys of the PairRDD, I understand that I lose the partitioning > // the consequence of this is that I can no longer be sure in my fold > function that the ordering is retained. > > val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2)) > val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) > finalResult.saveAsTextFile("hdfs://...") > > The problem as you'd expect takes place in the folding function, after > I've re-arranged my RDD to no longer be keyed by timestamp (when I produce > keyByUser, I lose the correct partitioning). As I've read in the > documentation, partitioning is not preserved when keys are changed (which > makes sense). > > Reading this thread: > https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it > appears that one possible solution might be to subclass RDD (à la > MappedValuesRDD) to define my own RDD that retains the partitions of its > parent. This seems simple enough, but I've never done anything like that > before, but I'm not sure where to start. I'm also willing to write my own > custom partitioner class, but it appears that the getPartition method > only accepts a "key" argument - and since the value I need to partition on > in the final step (the timestamp) would be in the Value, my > partitioner class doesn't have the data it needs to make the right > decision. I cannot have timestamp in my key. > > Alternatively, has anyone else encountered a problem like this (i.e. an > approximately ordered sum) and did they find a good solution? Does my > approach of subclassing RDD make sense? Would there be some way to > finagle a custom partitioner into making this work? Perhaps this might be a > job for some other tool, like spark streaming? > > Thanks, > Michael > > >