Would something like this help? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
On Thu, Jul 24, 2014 at 8:40 AM, Eugene Cheipesh <echeip...@gmail.com> wrote: > Hello, > > I have an interesting use case for a pre-filtered RDD. I have two solutions > that I am not entirly happy with and would like to get some feedback and > thoughts. Perhaps it is a use case that could be more explicitly supported > in Spark. > > My data has well defined semantics for they key values that I can use to > pre-filter an RDD to exclude those partitions and records that I will not > need from being loaded at all. In most cases this is significant savings. > > Essentially the dataset is geographic image tiles, as you would see on > google maps. The entire dataset could be huge, covering an entire continent > at high resolution. But if I want to work with a subset, lets say a single > city, it makes no sense for me to load all the partitions into memory just > so I can filter them as a first step. > > First attempt was to extent NewHadoopRDD as follows: > > abstract class PreFilteredHadoopRDD[K, V]( > sc : SparkContext, > inputFormatClass: Class[_ <: InputFormat[K, V]], > keyClass: Class[K], > valueClass: Class[V], > @transient conf: Configuration) > extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, conf) > { > /** returns true if specific partition has relevant keys */ > def includePartition(p: Partition): Boolean > > /** returns true if the specific key in the partition passes the filter > */ > def includeKey(key: K): Boolean > > override def getPartitions: Array[Partition] = { > val partitions = super.getPartitions > partitions.filter(includePartition) > } > > override def compute(theSplit: Partition, context: TaskContext) = { > val ii = super.compute(theSplit, context) > new InterruptibleIterator(ii.context, ii.delegate.filter{case (k,v) => > includeKey(k)}) > } > } > > NewHadoopRDD for reference: > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala > > This is nice and handles the partition portion of the issue well enough, > but > by the time the iterator is created by super.compute there is no way avoid > reading the values from records that do not pass my filter. > > Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat I can > do better, and avoid deserializing the values if I could get my hands on > the > reader and re-implement compute(). But this does not seem possible to do > through extension because both the NewHadooprRDD.confBroadcast and > NewHadoopPartition are private. There does not seem to be a choice but to > copy/paste extend the NewHadoopRDD. > > The two solutions that are apparent are: > 1. remove those private modifiers > 2. factor out reader creation to a method that can be used to reimplement > compute() in a sub-class > > I would be curious to hear if anybody had/has similar problem and any > thoughts on the issue. If you think there is PR in this I’d be happy to > code > it up and submit it. > > > Thank you > -- > Eugene Cheipesh > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered-hadoop-RDD-use-case-tp7484.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. >