Hello,

I have an interesting use case for a pre-filtered RDD. I have two solutions
that I am not entirly happy with and would like to get some feedback and
thoughts. Perhaps it is a use case that could be more explicitly supported
in Spark.

My data has well defined semantics for they key values that I can use to
pre-filter an RDD to exclude those partitions and records that I will not
need from being loaded at all. In most cases this is significant savings.

Essentially the dataset is geographic image tiles, as you would see on
google maps. The entire dataset could be huge, covering an entire continent
at high resolution. But if I want to work with a subset, lets say a single
city, it makes no sense for me to load all the partitions into memory just
so I can filter them as a first step.

First attempt was to extent NewHadoopRDD as follows:

abstract class PreFilteredHadoopRDD[K, V](
    sc : SparkContext,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    @transient conf: Configuration)
  extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, conf)
{
  /** returns true if specific partition has relevant keys */
  def includePartition(p: Partition): Boolean

  /** returns true if the specific key in the partition passes the filter */
  def includeKey(key: K): Boolean

  override def getPartitions: Array[Partition] = {
    val partitions = super.getPartitions
    partitions.filter(includePartition)
  }

  override def compute(theSplit: Partition, context: TaskContext) = {
    val ii = super.compute(theSplit, context)
    new InterruptibleIterator(ii.context, ii.delegate.filter{case (k,v) =>
includeKey(k)})
  }
} 

NewHadoopRDD for reference:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

This is nice and handles the partition portion of the issue well enough, but
by the time the iterator is created by super.compute there is no way avoid
reading the values from records that do not pass my filter. 

Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat I can
do better, and avoid deserializing the values if I could get my hands on the
reader and re-implement compute(). But this does not seem possible to do
through extension because both the NewHadooprRDD.confBroadcast and
NewHadoopPartition are private. There  does not seem to be a choice but to
copy/paste extend the NewHadoopRDD.

The two solutions that are apparent are:
1. remove those private modifiers
2. factor out reader creation to a method that can be used to reimplement
compute() in a sub-class

I would be curious to hear if anybody had/has similar problem and any
thoughts on the issue. If you think there is PR in this I’d be happy to code
it up and submit it.


Thank you
--
Eugene Cheipesh



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered-hadoop-RDD-use-case-tp7484.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to