Hello !
I think I found a performant and nice solution based on take' source code :
def exists[T](rdd: RDD[T])(qualif: T => Boolean, num: Int): Boolean = {
if (num == 0) {
true
} else {
var count: Int = 0
val totalParts: Int = rdd.partitions.length
var partsScanned: Int = 0
while (count < num && partsScanned < totalParts) {
var numPartsToTry: Int = 1
if (partsScanned > 0) {
if (count == 0) {
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = Math.max((1.5 * num * partsScanned /
count).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
val left: Int = num - count
val p: Range = partsScanned until math.min(partsScanned +
numPartsToTry, totalParts)
val res: Array[Int] = rdd.sparkContext.runJob(rdd, (it:
Iterator[T]) => it.filter(qualif).take(left).size, p, allowLocal =
true)
count = count + res.sum
partsScanned += numPartsToTry
}
count >= num
}
}
//val all:RDD[Any]
println(exists(all)(_ => {println(".") ; true}, 10))
It's super fast for small values of n and I think it parallelise
nicely for large values.
Please tell me what you think.
Have a nice day,
Jonathan
On 5 August 2015 at 19:18, Jonathan Winandy <[email protected]>
wrote:
> Hello !
>
> You could try something like that :
>
> def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = {
>
> val context: SparkContext = rdd.sparkContext
> val grp: String = Random.alphanumeric.take(10).mkString
> context.setJobGroup(grp, "exist")
> val count: Accumulator[Long] = context.accumulator(0L)
>
> val iteratorToInt: (Iterator[T]) => Int = {
> iterator =>
> val i: Int = iterator.count(f)
> count += i
> i
> }
>
> val t = new Thread {
> override def run {
> while (count.value < n) {}
> context.cancelJobGroup(grp)
> }
> }
> t.start()
> try {
> context.runJob(rdd, iteratorToInt) > n
> } catch {
> case e:SparkException => {
> count.value > n
> }
> } finally {
> t.stop()
> }
>
> }
>
>
>
> It stops the computation if enough elements satisfying the condition are
> witnessed.
>
> It is performant if the RDD is well partitioned. If this is a problem, you
> could change iteratorToInt to :
>
> val iteratorToInt: (Iterator[T]) => Int = {
> iterator =>
> val i: Int = iterator.count(x => {
> if(f(x)) {
> count += 1
> true
> } else false
> })
> i
>
> }
>
>
> I am interested in a safer way to perform partial computation in spark.
>
> Cheers,
> Jonathan
>
> On 5 August 2015 at 18:54, Feynman Liang <[email protected]> wrote:
>
>> qualifying_function() will be executed on each partition in parallel;
>> stopping all parallel execution after the first instance satisfying
>> qualifying_function() would mean that you would have to effectively make
>> the computation sequential.
>>
>> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <[email protected]>
>> wrote:
>>
>>> Okay. I think I got it now. Yes take() does not need to be called more
>>> than once. I got the impression that we wanted to bring elements to the
>>> driver node and then run out qualifying_function on driver_node.
>>>
>>> Now, I am back to my question which I started with: Could there be an
>>> approach where the qualifying_function() does not get called after an
>>> element has been found?
>>>
>>>
>>> Regards,
>>> Sandeep Giri,
>>> +1 347 781 4573 (US)
>>> +91-953-899-8962 (IN)
>>>
>>> www.KnowBigData.com. <http://KnowBigData.com.>
>>> Phone: +1-253-397-1945 (Office)
>>>
>>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image:
>>> other site icon] <http://knowbigdata.com> [image: facebook icon]
>>> <https://facebook.com/knowbigdata> [image: twitter icon]
>>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData>
>>>
>>>
>>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <[email protected]> wrote:
>>>
>>>> take only brings n elements to the driver, which is probably still a
>>>> win if n is small. I'm not sure what you mean by only taking a count
>>>> argument -- what else would be an arg to take?
>>>>
>>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <[email protected]>
>>>> wrote:
>>>>
>>>>> Yes, but in the take() approach we will be bringing the data to the
>>>>> driver and is no longer distributed.
>>>>>
>>>>> Also, the take() takes only count as argument which means that every
>>>>> time we would transferring the redundant elements.
>>>>>
>>>>>
>>>
>>
>