Hi Nicholas,

For us, we have a high interruption rate (each worker host goes down twice
a week at least for maintenance), so our task failure rate was also
high, especially given our scale. This was especially the case for Flink
where replication is not available, and would ultimately cause the job to
fail after cascading task failures. For Spark, it is a lesser concern given
that we replicate, but still results in a task failure when a connection is
failed with a worker that is not available anymore (of course, for Spark
the replica is available so it would not cause app failure).

There would be no extra cost for scenarios in which this feature is
disabled. The normal path would remain the same.

Regarding configuration changes, this feature would just be
enabled/disabled via a new boolean flag (
*celeborn.master.slot.assign.interruptionAware*). If this feature is
enabled, the percentile threshold that I talked about in my previous email
would also be a config (
*celeborn.master.slot.assign.interruptionAware.threshold*), with a default
of 50%.

Thanks,
Aravind

On Wed, May 14, 2025 at 7:29 PM Nicholas Jiang <nicholasji...@apache.org>
wrote:

> Hi Aravind,
>
> Thanks for your detailed explanation.
>
> The mechanism of disruption aware slot selection makes sense to me. But
> I'm curious what the failure rate of tasks before you came up with this
> solution for substantial decrease (20x) in task failures. The task failure
> rate in our production environment is already very low, especially for
> Flink batch jobs without replica support. Meanwhile, is there any
> configuration changes for disruption aware slot selection? I did not see
> any configuration changes in this proposal.
>
> Regards,
> Nicholas Jiang
>
> On 2025/05/13 21:46:46 Aravind Patnam wrote:
> > Hi Nicholas,
> >
> > Thanks for the feedback and questions! Let me answer your questions
> below.
> >
> > > 1. Why is the lack of Flink replication support the motivation for
> > disruption aware slot selection? Do you mean that disruption aware slot
> > selection helps to reduce recompution costs for Flink without replication
> > support?
> > Yes, this would significantly reduce recomputation costs. This is
> > especially in an environment like ours internally, where we experience
> > constant interruptions throughout the day to a large set of workers. By
> > proactively only using workers that won't get interrupted, we can
> minimize
> > this recomputation cost and reduce the task failures.
> >
> > > 2. Can you provide a complete definition of the
> /updateInterruptionNotice
> > interface? Meanwhile, could you also provide the definition of
> > corresponding CLI interface?
> > Yes, sure - I have updated the CIP google doc to now include the
> interface
> > <
> https://docs.google.com/document/d/16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw/edit?pli=1&tab=t.0#heading=h.96cl46gz9o8l
> >
> > now. In summary, it would be a PUT api, which would take in a List of
> > *UpdateInterruptionNoticeRequests.
> > *This object's fields would simply be the worker hostname along with the
> > next known interruption timestamp in milliseconds. Every time this API is
> > called, it will reset the workers not included in the request back to no
> > nextKnownDisruption automatically.
> >
> > > 3. How is the performance of disruption aware slot selection? Which
> > scenario could users use disruption aware slot selection?
> > Internally, we have enabled this feature and we see great improvements to
> > our task failures. You can see the results here
> > <
> https://docs.google.com/document/d/16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw/edit?pli=1&tab=t.0#bookmark=id.asvbrvv0c9q6
> >
> > --
> > we roughly see *20x decrease for Spark task failures related to
> > interruptions*. This is because we have a high rate of interruptions
> > internally. Other users with similar environments would greatly benefit
> > from this feature, as they can enable this to prioritize workers that
> will
> > not get interrupted.
> >
> > > 4. How could the cluster administrator determine
> > workersWithLateInterruptions and workersWithEarlyInterruptions? BTW, how
> > does the administrator evaluate the threshold percentile based on the
> range
> > of interruption timestamps?
> > This sort of depends on how the interruptions are scheduled and how much
> > advance notice is present, along with the application characteristics in
> > the cluster. We plan on distinguishing the workers as follows in the
> code:
> >
> >
> >    -
> >
> >    prioritizedWorkers = workersWithNoInterruptions +
> >    workersWithLateInterruptions
> >    -
> >
> >    deprioritizedWorkers = workersWithEarlyInterruptions
> >
> >
> > The calculation is:
> >
> >    - workersWithLateInterruptions = percentageThreshold *
> >    sorted(totalWorkers)
> >    - workersWithEarlyInterruptions = totalWorkers -
> >    workersWithLateInterruptions
> >
> >
> > This means that the higher the percentage threshold, the higher the size
> of
> > workersWithLateInterruptions, which means the higher the number of
> > prioritizedWorkers.
> >
> > For example, if you have a larger advance notice (such as 1 week or so),
> it
> > makes sense to set this percentage threshold higher (e.g. 90%). If you
> have
> > a smaller advanced notice window, it makes sense to set it something
> lower
> > (e.g. 50%).
> >
> > Consider this example where there are 10 workers with interruption
> > timestamps t5-t14, and current time t0:
> >
> > w1
> >
> > w2
> >
> > w3
> >
> > w4
> >
> > w5
> >
> > w6
> >
> > w7
> >
> > w8
> >
> > w9
> >
> > w10
> >
> > t5
> >
> > t6
> >
> > t7
> >
> > t8
> >
> > t9
> >
> > t10
> >
> > t11
> >
> > t12
> >
> > t13
> >
> > t14
> >
> > If we set percentageThreshold to 50%, then the following would occur:
> >
> >    - w1 - w5 would be considered workersWithEarlyInterruptions
> >    - w6 - w10 would be considered workersWithLateInterruptions
> >
> > Given current time is only t0 and is far away from t5, this means we are
> > probably wasting workers w1 - w5 since they have a long time to be
> > disrupted anyways. Hence, in this case it would make more sense to set a
> > higher percentage threshold such as 90%. Then it would be like this:
> >
> >    - w1 would be considered workersWithEarlyInterruptions
> >    - w2 - w10 would be considered workersWithLateInterruptions
> >
> > This is better for the cluster given a larger advanced notice.
> >
> > However, if there is a smaller advanced notice, consider this example:
> >
> > w1
> >
> > w2
> >
> > w3
> >
> > w4
> >
> > w5
> >
> > w6
> >
> > w7
> >
> > w8
> >
> > w9
> >
> > w10
> >
> > t1
> >
> > t2
> >
> > t3
> >
> > t4
> >
> > t5
> >
> > t6
> >
> > t7
> >
> > t8
> >
> > t9
> >
> > t10
> >
> > If we set percentageThreshold to 90%, then the following would occur:
> >
> >    - w1 would be considered workersWithEarlyInterruptions
> >    - w2 - w10 would be considered workersWithLateInterruptions
> >
> > Given current time is only t0 and is close to t1 or t2, medium/long
> running
> > jobs would probably incur failures when w2-w10 go down. Hence in this
> case,
> > it might be better to set to a lower percentage, such as 50%. Then it
> would
> > be like this:
> >
> >    - w1-5 would be considered workersWithEarlyInterruptions
> >    - w6 - w10 would be considered workersWithLateInterruptions
> >
> > This is better for the cluster given a smaller advanced notice.
> >
> > Overall, the cluster administrator should take into account the
> > interruption advanced notice time, along with the average/median job
> > runtime in the cluster before setting this value.
> > We can add more documentation on how to set this value once we start
> > implementing it.
> >
> > Hope this answers all your questions!
> >
> > Thanks,
> > Aravind
> >
> >
> > On Tue, May 13, 2025 at 9:40 AM Nicholas Jiang <nicholasji...@apache.org
> >
> > wrote:
> >
> > > Hi Aravind,
> > >
> > > Thanks for driving the proposal of interruption aware slot selection. I
> > > have some comments for this proposal:
> > >
> > > 1. Why is the lack of Flink replication support the motivation for
> > > disruption aware slot selection? Do you mean that disruption aware slot
> > > selection helps to reduce recompution costs for Flink without
> replication
> > > support?
> > >
> > > 2. Can you provide a complete definition of the
> /updateInterruptionNotice
> > > interface? Meanwhile, could you also provide the definition of
> > > corresponding CLI interface?
> > >
> > > 3. How is the performance of disruption aware slot selection? Which
> > > scenario could users use disruption aware slot selection?
> > >
> > > 4. How could the cluster administrator determine
> > > workersWithLateInterruptions and workersWithEarlyInterruptions? BTW,
> how
> > > does the administrator evaluate the threshold percentile based on the
> range
> > > of interruption timestamps?
> > >
> > > Regards,
> > > Nicholas Jiang
> > >
> > > On 2025/05/02 07:40:15 Aravind Patnam wrote:
> > > > Hi Celeborn community,
> > > >
> > > > I have written up CIP-17: Interruption Aware Slot Selection
> > > > <
> > >
> https://docs.google.com/document/d/16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw/edit?usp=sharing
> > > >.
> > > > Please review and let me know if there are any comments or questions.
> > > >
> > > > This is a feature we have introduced internally, given our heavy
> volume
> > > of
> > > > interruptions. We have seen substantial decrease in task failures in
> both
> > > > Flink and Spark jobs, and think the community would also benefit from
> > > this
> > > > :)
> > > >
> > > > Looking forward to getting feedback from the community.
> > > >
> > > > Thanks,
> > > > Aravind
> > > >
> > > >  CIP 17: Interruption Aware Slot Selection
> > > > <
> > >
> https://drive.google.com/open?id=16Lj4KadSb6ypaXTg5tJB0QvaXG8vTLtyoj7V4umTZqw
> > > >
> > > >
> > >
> >
> >
> > --
> > Aravind K. Patnam
> >
>


-- 
Aravind K. Patnam

Reply via email to