This is essentially what the cluster schedulers do: allow different people to 
submit work with different credentials and priority; cgroups & equivalent to 
limit granted resources to requested ones. If you have pre-emption enabled, you 
can even have one job kill work off the others. Spark does recognise 
pre-emption failures and doesn't treat it as a sign of problems in the 
executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale 
computing —if you are curious about what is state of the art, look at the 
Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google 
work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just 
meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% 
drop in throughput would be.

I would strongly encourage you to avoid this topic, unless you want dive deep 
into the whole world of cluster scheduling, the debate over centralized vs 
decentralized, the idelogical one of "should services ever get allocated 
RAM/CPU in times of low overall load?", the challenge of swap, or more 
specifically, "how do you throttle memory consumption", as well as what to do 
when the IO load of a service is actually incurred on a completely different 
host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the 
Hadoop tree and look at 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux
 for the cgroup support, and then 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl
 for the native code needed alongside this. Then consider that it's not just a 
matter of writing something similar, it's getting an OSS project to actually 
commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark 
instances for the different workloads, with different CPU & memory limits, 
queue priorities, etc. Other people have done the work, written the tests, 
deployed it in production, met their own SLAs *and are therefore committed to 
maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis 
<theg...@trilliumit.com<mailto:theg...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the 
scheduling goal I have in mind, I hastily jumped straight into a technical 
solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the 
standalone scheduler. Perhaps you could/should classify it as a different 
scheduler, but I don't want to give the impression that this will be as 
difficult to implement as most schedulers are. Initially, from a memory 
perspective, we would still allocate in a FIFO manner. This new scheduling mode 
(or new scheduler, if you'd rather) would mostly benefit any users with 
small-ish clusters, both on-premise and cloud based. Essentially, my end goal 
is to be able to run multiple *applications* simultaneously with each 
application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a 
second application that I'd like to run hourly. The hourly application is more 
time critical (higher priority), so I'd like it to finish in a small amount of 
time. If I allow the first app to run with all cores (this takes several days 
on my 64 core cluster), then nothing else can be executed when running with the 
default FIFO scheduler. All of the cores have been allocated to the first 
application, and it will not release them until it is finished. Dynamic 
allocation does not help in this case, as there is always a backlog of tasks to 
run until the first application is nearing the end anyway. Naturally, I could 
just limit the number of cores that the first application has access to, but 
then I have idle cpu time when the second app is not running, and that is not 
optimal. Secondly in that case, the second application only has access to the 
*leftover* cores that the first app has not allocated, and will take a 
considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running 
without specifying the number of cores they want to utilize (whether 
intentionally or not). As I'm sure you know, the default is to allocate the 
entire cluster to this application. The cores allocated to this shell are 
unavailable to other applications, even if they are just sitting idle while a 
developer is getting their environment set up to run a very big job 
interactively. Other developers that would like to launch interactive shells 
are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many 
simultaneous applications to be running as the cluster memory (still statically 
partitioned, at least initially) will allow. Applications could be configured 
with a "cpu shares" parameter (just an arbitrary integer relative only to other 
applications) which is essentially just passed through to the linux cgroup 
cpu.shares setting. Since each executor of an application on a given worker 
runs in it's own process/jvm, then that process could be easily be placed into 
a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that 
processes competing for cpu time are allocated a percentage of time equal to 
their share count as a percentage of all shares in that level of the cgroup 
hierarchy. If two applications are both scheduled on the same core with the 
same weight, each will get to utilize 50% of the time on that core. This is all 
built into the kernel, and the only thing the spark worker has to do is create 
a cgroup for each application, set the cpu.shares parameter, and assign the 
executors for that application to the new cgroup. If multiple executors are 
running on a single worker, for a single application, the cpu time available to 
that application is divided among each of those executors equally. The default 
for cpu.shares is that they are not limiting in any way. A process can consume 
all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. 
Current consensus is "no". Why not? Because you may end up writing an 
SLA-sensitive app which just happens to meet it's SLAs in times of light 
cluster load, but precisely when the cluster is busy, it suddenly slows down, 
leading to stress all round, in the "why is this service suddenly unusable" 
kind of stress. Instead you keep the cluster busy with low priority preemptible 
work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to 
some abstraction) is that cpu share allocations are heterogeneous to all 
processes running on a machine. An admin could have very fine grained control 
over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive 
application could utilize 100% of all cluster cores if they are idle. Then my 
time sensitive app could be launched with nine times the priority and the linux 
kernel would scale back the first application to 10% of all cores (completely 
seemlessly and automatically: no pre-emption, just fewer time slices of cpu 
allocated by the kernel to the first application), while the second application 
gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one 
process starts to swap, it hurts everything else. But Java isn't that good at 
handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode 
would create an increase in context switching on each host. This issue is 
somewhat mitigated by still statically allocating memory however, since there 
wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. 
Linux cgroups allow you to control access to more than just cpu shares. You can 
apply the same concept to other resources (memory, disk io). You can also set 
up hard limits so that an application will never get more than is allocated to 
it. I know that those limitations are important for some use cases involving 
predictability of application execution times. Eventually, this idea could be 
expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a 
bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to 
show just how complex it gets in a large system. Before you begin to write a 
line of code, I'd recommend

-you read as much of the published work as you can, including the google and 
microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, 
that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still 
considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve


Reply via email to