Hi,

I've had some troubles developing a Specs2 matcher that checks that a
predicate holds for all the elements of an RDD, and using it for testing a
simple Spark Streaming program. I've finally been able to get a code that
works, you can see it in https://gist.github.com/juanrh/dffd060e3a371676b83c,
but I wanted to check with the list that I'm using the right approach.
First I defined the matcher as follows:

def foreachRecord[T](predicate : T => Boolean) : Matcher[RDD[T]] = {  (rdd
: RDD[T]) =>

      val failingRecords = rdd.filter(! predicate(_))

      (

        failingRecords.isEmpty,

        "each record fulfils the predicate",

        s"predicate failed for records ${failingRecords.take(4).mkString(",
")} ..."

      )

    }

which works ok for examples like

def simpleBatchTest = {

    val rdd = sc.parallelize(1 to 100, 3)

    rdd should foreachRecord(_ > 0)

  }


The problem started when I tried to use it to check that a predicate holds
for all batches / RDDs of a DStream. The idea was using foreachRDD to
update a driver local org.specs2.execute.Result object, to make an and for
all the batches:

def simpleStreamingTest : Result = {

    val ssc = new StreamingContext(sc, Duration(300))

    val record = "hola"

    val batches = Seq.fill(5)(Seq.fill(10)(record))

    val queue = new Queue[RDD[String]]

    queue ++= batches.map(batch => sc.parallelize(batch, numSlices = 2))

    val inputDStream = ssc.queueStream(queue, oneAtATime = true)

    var result : Result = ok

    inputDStream.foreachRDD { rdd =>

       val r = AsResult { rdd should foreachRecord(_ == record) }
       result = result and r

    }



    ssc.start()

    StreamingContextUtils.awaitForNBatchesCompleted(batches.length)(ssc)

    ssc.stop(stopSparkContext=false, stopGracefully=false)

    println(s"result : ${result}")

    result

  }


This leads to an exception running the test, because AsResult { rdd should
foreachRecord(_ == record) } fails because the closure argument of
foreachRecord needs to access the local variable 'record', and to do that
it tries to serialize the whole context, including 'result', for which it
fails with:

         Driver stacktrace:,org.apache.spark.SparkException: Job aborted
due to stage failure:

         Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task
0.0 in stage 0.0 (TID 0, localhost):

        java.io.InvalidClassException: org.specs2.execute.Success; no valid
constructor


To solve this I followed
http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/
to specify more carefully which values would be accessed by the closure,
defining the matcher

 def foreachRecord[T,C](predicateContext : C)(toPredicate : C => (T =>
Boolean)) : Matcher[RDD[T]] = {

      val predicate = toPredicate(predicateContext)

      foreachRecord(predicate)

    }


and then replacing the call above with

      val r = AsResult { rdd should foreachRecord(record)(r => { _ == r} )
}


I'm writing to the list because I wanted to conform that this is a proper
solution, and wanted to ask you guys if somebody can imagine a better
solution, as this is not too bad but the call foreachRecord(record)(r => {
_ == r} ) is still a bit ugly. It is curious that Spark's closure cleaner
is smart enough to avoid sending 'result' to the closure in this very
similar example:

      val r2 = AsResult { rdd.filter(_ != record).count === 0 }

Also, I wanted to confirm that the approach of updating a var in driver in
a foreachRDD is a good idea, I understand that foreachRDD runs in the
driver, so that should be ok as long as the local variable is updated using
action results only.

Thanks a lot in advance.

Greetings,

Juan

Reply via email to