Would something like this help?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala




On Thu, Jul 24, 2014 at 8:40 AM, Eugene Cheipesh <echeip...@gmail.com>
wrote:

> Hello,
>
> I have an interesting use case for a pre-filtered RDD. I have two solutions
> that I am not entirly happy with and would like to get some feedback and
> thoughts. Perhaps it is a use case that could be more explicitly supported
> in Spark.
>
> My data has well defined semantics for they key values that I can use to
> pre-filter an RDD to exclude those partitions and records that I will not
> need from being loaded at all. In most cases this is significant savings.
>
> Essentially the dataset is geographic image tiles, as you would see on
> google maps. The entire dataset could be huge, covering an entire continent
> at high resolution. But if I want to work with a subset, lets say a single
> city, it makes no sense for me to load all the partitions into memory just
> so I can filter them as a first step.
>
> First attempt was to extent NewHadoopRDD as follows:
>
> abstract class PreFilteredHadoopRDD[K, V](
>     sc : SparkContext,
>     inputFormatClass: Class[_ <: InputFormat[K, V]],
>     keyClass: Class[K],
>     valueClass: Class[V],
>     @transient conf: Configuration)
>   extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, conf)
> {
>   /** returns true if specific partition has relevant keys */
>   def includePartition(p: Partition): Boolean
>
>   /** returns true if the specific key in the partition passes the filter
> */
>   def includeKey(key: K): Boolean
>
>   override def getPartitions: Array[Partition] = {
>     val partitions = super.getPartitions
>     partitions.filter(includePartition)
>   }
>
>   override def compute(theSplit: Partition, context: TaskContext) = {
>     val ii = super.compute(theSplit, context)
>     new InterruptibleIterator(ii.context, ii.delegate.filter{case (k,v) =>
> includeKey(k)})
>   }
> }
>
> NewHadoopRDD for reference:
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
>
> This is nice and handles the partition portion of the issue well enough,
> but
> by the time the iterator is created by super.compute there is no way avoid
> reading the values from records that do not pass my filter.
>
> Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat I can
> do better, and avoid deserializing the values if I could get my hands on
> the
> reader and re-implement compute(). But this does not seem possible to do
> through extension because both the NewHadooprRDD.confBroadcast and
> NewHadoopPartition are private. There  does not seem to be a choice but to
> copy/paste extend the NewHadoopRDD.
>
> The two solutions that are apparent are:
> 1. remove those private modifiers
> 2. factor out reader creation to a method that can be used to reimplement
> compute() in a sub-class
>
> I would be curious to hear if anybody had/has similar problem and any
> thoughts on the issue. If you think there is PR in this I’d be happy to
> code
> it up and submit it.
>
>
> Thank you
> --
> Eugene Cheipesh
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered-hadoop-RDD-use-case-tp7484.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>

Reply via email to