Thanks Sebastian. I was indeed trying out FAIR scheduling with a high value for concurrentJobs today.
It does improve the latency seen by the non-hot partitions, even if it does not provide complete isolation. So it might be an acceptable middle ground. On 12 Feb 2016 12:18, "Sebastian Piu" <sebastian....@gmail.com> wrote: > Have you tried using fair scheduler and queues > On 12 Feb 2016 4:24 a.m., "p pathiyil" <pathi...@gmail.com> wrote: > >> With this setting, I can see that the next job is being executed before >> the previous one is finished. However, the processing of the 'hot' >> partition eventually hogs all the concurrent jobs. If there was a way to >> restrict jobs to be one per partition, then this setting would provide the >> per-partition isolation. >> >> Is there anything in the framework which would give control over that >> aspect ? >> >> Thanks. >> >> >> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> spark.streaming.concurrentJobs >>> >>> >>> see e.g. >>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming >>> >>> >>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil <pathi...@gmail.com> wrote: >>> >>>> Thanks for the response Cody. >>>> >>>> The producers are out of my control, so can't really balance the >>>> incoming content across the various topics and partitions. The number of >>>> topics and partitions are quite large and the volume across then not very >>>> well known ahead of time. So it is quite hard to segregate low and high >>>> volume topics in to separate driver programs. >>>> >>>> Will look at shuffle / repartition. >>>> >>>> Could you share the setting for starting another batch in parallel ? It >>>> might be ok to call the 'save' of the processed messages out of order if >>>> that is the only consequence of this setting. >>>> >>>> When separate DStreams are created per partition (and if union() is not >>>> called on them), what aspect of the framework still ties the scheduling of >>>> jobs across the partitions together ? Asking this to see if creating >>>> multiple threads in the driver and calling createDirectStream per partition >>>> in those threads can provide isolation. >>>> >>>> >>>> >>>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> The real way to fix this is by changing partitioning, so you don't >>>>> have a hot partition. It would be better to do this at the time you're >>>>> producing messages, but you can also do it with a shuffle / repartition >>>>> during consuming. >>>>> >>>>> There is a setting to allow another batch to start in parallel, but >>>>> that's likely to have unintended consequences. >>>>> >>>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil <pathi...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am looking at a way to isolate the processing of messages from each >>>>>> Kafka partition within the same driver. >>>>>> >>>>>> Scenario: A DStream is created with the createDirectStream call by >>>>>> passing in a few partitions. Let us say that the streaming context is >>>>>> defined to have a time duration of 2 seconds. If the processing of >>>>>> messages >>>>>> from a single partition takes more than 2 seconds (while all the others >>>>>> finish much quicker), it seems that the next set of jobs get scheduled >>>>>> only >>>>>> after the processing of that last partition. This means that the delay is >>>>>> effective for all partitions and not just the partition that was truly >>>>>> the >>>>>> cause of the delay. What I would like to do is to have the delay only >>>>>> impact the 'slow' partition. >>>>>> >>>>>> Tried to create one DStream per partition and then do a union of all >>>>>> partitions, (similar to the sample in >>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times), >>>>>> but that didn't seem to help. >>>>>> >>>>>> Please suggest the correct approach to solve this issue. >>>>>> >>>>>> Thanks, >>>>>> Praveen. >>>>>> >>>>> >>>>> >>>> >>> >>