You might get stage information through SparkListener. But I am not sure
whether you can use that information to easily kill stages.
Though i highly recommend using Spark 1.3.1 (or even Spark master). Things
move really fast between releases. 1.1.1 feels really old to me :P

TD

On Wed, May 13, 2015 at 1:25 PM, Du Li <l...@yahoo-inc.com> wrote:

> I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside
> dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context
> seems no longer valid, which crashes subsequent jobs.
>
> My spark version is 1.1.1. I will do more investigation into this issue,
> perhaps after upgrading to 1.3.1, and then file a JIRA if it persists.
>
> Is there a way to get stage or task id of a particular transformation or
> action on RDD and then selectively kill the stage or tasks? It would be
> necessary and useful in situations similar to countApprox.
>
> Thanks,
> Du
>
>
>
>   On Wednesday, May 13, 2015 1:12 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>
> That is not supposed to happen :/ That is probably a bug.
> If you have the log4j logs, would be good to file a JIRA. This may be
> worth debugging.
>
> On Wed, May 13, 2015 at 12:13 PM, Du Li <l...@yahoo-inc.com> wrote:
>
> Actually I tried that before asking. However, it killed the spark context.
> :-)
>
> Du
>
>
>
>   On Wednesday, May 13, 2015 12:02 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>
> That is a good question. I dont see a direct way to do that.
>
> You could do try the following
>
> val jobGroupId = <group-id-based-on-current-time>
> rdd.sparkContext.setJobGroup(jobGroupId)
> val approxCount = rdd.countApprox().getInitialValue   // job launched with
> the set job group
> rdd.sparkContext.cancelJobGroup(jobGroupId)           // cancel the job
>
>
> On Wed, May 13, 2015 at 11:24 AM, Du Li <l...@yahoo-inc.com> wrote:
>
> Hi TD,
>
> Do you know how to cancel the rdd.countApprox(5000) tasks after the
> timeout? Otherwise it keeps running until completion, producing results not
> used but consuming resources.
>
> Thanks,
> Du
>
>
>
>   On Wednesday, May 13, 2015 10:33 AM, Du Li <l...@yahoo-inc.com.INVALID>
> wrote:
>
>
>  Hi TD,
>
> Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming
> app is standing a much better chance to complete processing each batch
> within the batch interval.
>
> Du
>
>
>   On Tuesday, May 12, 2015 10:31 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>
> From the code it seems that as soon as the " rdd.countApprox(5000)"
> returns, you can call "pResult.initialValue()" to get the approximate count
> at that point of time (that is after timeout). Calling
> "pResult.getFinalValue()" will further block until the job is over, and
> give the final correct values that you would have received by "rdd.count()"
>
> On Tue, May 12, 2015 at 5:03 PM, Du Li <l...@yahoo-inc.com.invalid> wrote:
>
> HI,
>
> I tested the following in my streaming app and hoped to get an approximate
> count within 5 seconds. However, rdd.countApprox(5000).getFinalValue()
> seemed to always return after it finishes completely, just like
> rdd.count(), which often exceeded 5 seconds. The values for low, mean, and
> high were the same.
>
> val pResult = rdd.countApprox(5000)
> val bDouble = pResult.getFinalValue()
> logInfo(s"countApprox().getFinalValue(): low=${bDouble.low.toLong},
> mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}")
>
> Can any expert here help explain the right way of usage?
>
> Thanks,
> Du
>
>
>
>
>
>
>
>   On Wednesday, May 6, 2015 7:55 AM, Du Li <l...@yahoo-inc.com.INVALID>
> wrote:
>
>
> I have to count RDD's in a spark streaming app. When data goes large,
> count() becomes expensive. Did anybody have experience using countApprox()?
> How accurate/reliable is it?
>
> The documentation is pretty modest. Suppose the timeout parameter is in
> milliseconds. Can I retrieve the count value by calling getFinalValue()?
> Does it block and return only after the timeout? Or do I need to define
> onComplete/onFail handlers to extract count value from the partial results?
>
> Thanks,
> Du
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to