
What about yarn or mesos used in combination with Spark. The have also cgroups. 
Or a kubernetes etc deployment.

> Hello Spark Devs,
> I have finally completed a mostly working proof of concept. I do not want to 
> create a pull request for this code, as I don't believe it's production 
> worthy at the moment. My intent is to better communicate what I'd like to 
> accomplish. Please review the following patch: 
> https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.
> What the code does:
> Currently, it exposes two options "spark.cgroups.enabled", which defaults to 
> false, and "spark.executor.shares" which defaults to None. When cgroups mode 
> is enabled, a single executor is created on each worker, with access to all 
> cores. The worker will create a parent cpu cgroup (on first executor launch) 
> called "spark-worker" to house any executors that it launches. Each executor 
> is put into it's own cgroup named with the app id, under the parent cgroup. 
> The cpu.shares parameter is set to the value in "spark.executor.shares", if 
> this is "None", it inherits the value from the parent cgroup.
> Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have 
> not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to 
> change this code base, but it looks like the kernel interface is the same for 
> the most part.
> I was able to launch a spark shell which consumed all cores in the cluster, 
> but sat idle. I was then able to launch an application (client deploy-mode) 
> which was also allocated all cores in the cluster, and ran to completion 
> unhindered. Each of the executors on each worker was properly placed into 
> it's respective cgroup, which in turn had the correct cpu.shares value 
> allocated.
> What the code still needs:
> * Documentation (assuming the community moves forward with some kind of 
> implementation)
> * Sometimes the cgroups get destroyed after app completion, sometimes they 
> don't. (need to put `.destroy()` call in a `finally` block., or in the 
> `maybeCleanupApplication()` method; what do you think?)
> * Proper handling of drivers's resources when running `--deploy-mode cluster`
> * Better web UI indication of cgroup mode or core sharing (currently just 
> shows up as an over allocation of cores per worker)
> * Better environment/os/platform detection and testing (I won't be surprised 
> if there is something broken if trying to run this on a different OS)
> * Security/permissions for cgroups if running worker as non-root (perhaps 
> creating the parent cgroup with correct permissions before launching the 
> worker is all that is necessary)
>   - running the worker in a container currently requires --privileged mode (I 
> haven't figured out if/what capability makes cgroups writable, or if it's 
> possible to use a new cgroup mount point)
> * More user defined options
>   - cgroup root path (currently hard coded)
>   - driver cpu.shares (for cluster deploy-mode: would require a specially 
> named cgroup... s"$appId-driver" ? default same #shares as executor? default 
> double shares?
>   - parent cpu.shares (currently os default)
>   - parent cgroup name (currently hard coded)
> I tried to structure the initial concept to make it easy to add support for 
> more cgroup features (cpuset, mem, etc...), should the community feel there 
> is value in adding them. Linux cgroups are an extremely powerful resource 
> allocation and isolation tool, and this patch is only scratching the surface 
> of their general capabilities. Of course, as Mr. Loughran's points out, 
> expanding into these features will require more code maintenance, but not 
> enough that we should shy away from it.
> <opinion>
> I personally believe that any multi-node resource allocation system should 
> offload as much of the scheduling and resource allocation as possible to the 
> underlying kernel within the node level. Each node's own kernel is the best 
> equipped place to manage those resources. Only the node's kernel can allocate 
> a few seconds worth of cpu to the low priority app, while the high priority 
> app is waiting on disk I/O, and instantly give it back to the high priority 
> app when it needs it, with (near) real-time granularity
> The multi-node system should set up a proper framework to give each node's 
> kernel the information it needs to allocate the resources correctly. 
> Naturally, the system should allow resource reservations, and even limits, 
> for the purposes of meeting and testing for SLAs and worst case scenarios as 
> well. Linux cgroups are capable of doing those things in a near real-time 
> fashion.
> With a proper convention of priorities/shares for applications within an 
> organization, I believe that everyone can get better throughput out of their 
> hardware, at any cluster size. But, alas, *that* is not a problem I'm trying 
> to solve currently.
> </opinion>
> Sorry that the patch is pretty rough still, as I'm still getting my head 
> wrapped around spark's code base structure. Looking forward to any feedback.
> Thanks,
> Travis
> Steve,
> I appreciate your experience and insight when dealing with large clusters at 
> the data-center scale. I'm also well aware of the complex nature of 
> schedulers, and that it is an area of ongoing research being done by 
> people/companies with many more resources than I have. This might explain my 
> apprehension in even calling this idea a *scheduler*: I wanted to avoid this 
> exact kind of debate over what I want to accomplish. This is also why I 
> mentioned that this idea will mostly benefit users with small clusters.
> I've used many of the big named "cluster schedulers" (YARN, Mesos, and 
> Kubernetes) and the main thing that they have in common is that they don't 
> work well for my use case. Those systems are designed for large scale 1000+ 
> node clusters, and become painful to manage in the small cluster range. Most 
> of the tools that we've attempted to use don't work well for us, so we've 
> written several of our own: https://github.com/trilliumit/.
> It can be most easily stated by the fact that *we are not* Google, Facebook, 
> or Amazon: we don't have a *data-center* of servers to manage, we barely have 
> half of a rack. *We are not trying to solve the problem that you are 
> referring to*. We are operating at a level that if we aren't meeting SLAs, 
> then we could just buy another server to add to the cluster. I imagine that 
> we are not alone in that fact either, I've seen that many of the questions on 
> SO and on the user list are from others operating at a level  similar to ours.
> I understand that pre-emption isn't inherently a bad thing, and that these 
> multi-node systems typically handle it gracefully. However, if idle CPU is 
> expensive, then how much more does wasted CPU cost when a nearly complete 
> task is pre-empted and has to be started over? Fortunately for me, that isn't 
> a problem that I have to solve at the moment.
> >Instead? Use a multi-user cluster scheduler and spin up different spark 
> >instances for the different workloads
> See my above comment on how well these cluster schedulers work for us. I have 
> considered the avenue of multiple spark clusters, and in reality the 
> infrastructure we have set up would allow me to do this relatively easily. In 
> fact, in my environment, this is a similar solution to what I'm proposing, 
> just managed one layer up the stack and with less flexibility. I am trying to 
> avoid this solution however because it does require more overhead and 
> maintenance. What if I want two spark apps to run on the same cluster at the 
> same time, sharing the available CPU capacity equally? I can't accomplish 
> that easily with multiple spark clusters. Also, we are a 1 to 2 man operation 
> at this point, I don't have teams of ops people to task with managing as many 
> spark clusters as I feel like launching.
> >FWIW, it's often memory consumption that's most problematic here.
> Perhaps in the use-cases you have experience with, but not currently in mine. 
> In fact, my initial proposal is net yet changing the allocation of memory as 
> a resource. This would still be statically allocated in a FIFO manner as long 
> as memory is available on the cluster, the same way it is now.
> >I would strongly encourage you to avoid this topic
> Thanks for the suggestion, but I will choose how I spend my time. If I can 
> find a simple solution to a problem that I face, and I'm willing to share 
> that solution, I'd hope one would encourage that instead.
> Perhaps I haven't yet clearly communicated what I'm trying to do. In short, 
> *I am not trying to write a scheduler*: I am trying to slightly (and 
> optionally) tweak the way executors are allocated and launched, so that I can 
> more intuitively and more optimally utilize my small spark cluster.
> Thanks,
> Travis
> This is essentially what the cluster schedulers do: allow different people to 
> submit work with different credentials and priority; cgroups & equivalent to 
> limit granted resources to requested ones. If you have pre-emption enabled, 
> you can even have one job kill work off the others. Spark does recognise 
> pre-emption failures and doesn't treat it as a sign of problems in the 
> executor, that is: it doesn't over-react.
> cluster scheduling is one of the cutting edge bits of datacentre-scale 
> computing —if you are curious about what is state of the art, look at the 
> Morning Paper https://blog.acolyer.org/ for coverage last week of MS and 
> google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not 
> just meeting SLAs, its about how much idle CPU costs, and how expensive even 
> a 1-2% drop in throughput would be.
> I would strongly encourage you to avoid this topic, unless you want dive deep 
> into the whole world of cluster scheduling, the debate over centralized vs 
> decentralized, the idelogical one of "should services ever get allocated 
> RAM/CPU in times of low overall load?", the challenge of swap, or more 
> specifically, "how do you throttle memory consumption", as well as what to do 
> when the IO load of a service is actually incurred on a completely different 
> host from the one your work is running on. 
> There's also a fair amount of engineering work; to get a hint download the 
> Hadoop tree and look at 
> hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux
>  for the cgroup support, and then 
> hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl
>  for the native code needed alongside this. Then consider that it's not just 
> a matter of writing something similar, it's getting an OSS project to 
> actually commit to maintaining such code after you provide that initial 
> contribution.
> Instead? Use a multi-user cluster scheduler and spin up different spark 
> instances for the different workloads, with different CPU & memory limits, 
> queue priorities, etc. Other people have done the work, written the tests, 
> deployed it in production, met their own SLAs *and are therefore committed to 
> maintaining this stuff*.
> -Steve
>> My apologies, in my excitement of finding a rather simple way to accomplish 
>> the scheduling goal I have in mind, I hastily jumped straight into a 
>> technical solution, without explaining that goal, or the problem it's 
>> attempting to solve.
>> You are correct that I'm looking for an additional running mode for the 
>> standalone scheduler. Perhaps you could/should classify it as a different 
>> scheduler, but I don't want to give the impression that this will be as 
>> difficult to implement as most schedulers are. Initially, from a memory 
>> perspective, we would still allocate in a FIFO manner. This new scheduling 
>> mode (or new scheduler, if you'd rather) would mostly benefit any users with 
>> small-ish clusters, both on-premise and cloud based. Essentially, my end 
>> goal is to be able to run multiple *applications* simultaneously with each 
>> application having *access* to the entire core count of the cluster.
>> I have a very cpu intensive application that I'd like to run weekly. I have 
>> a second application that I'd like to run hourly. The hourly application is 
>> more time critical (higher priority), so I'd like it to finish in a small 
>> amount of time. If I allow the first app to run with all cores (this takes 
>> several days on my 64 core cluster), then nothing else can be executed when 
>> running with the default FIFO scheduler. All of the cores have been 
>> allocated to the first application, and it will not release them until it is 
>> finished. Dynamic allocation does not help in this case, as there is always 
>> a backlog of tasks to run until the first application is nearing the end 
>> anyway. Naturally, I could just limit the number of cores that the first 
>> application has access to, but then I have idle cpu time when the second app 
>> is not running, and that is not optimal. Secondly in that case, the second 
>> application only has access to the *leftover* cores that the first app has 
>> not allocated, and will take a considerably longer amount of time to run.
>> You could also imagine a scenario where a developer has a spark-shell 
>> running without specifying the number of cores they want to utilize (whether 
>> intentionally or not). As I'm sure you know, the default is to allocate the 
>> entire cluster to this application. The cores allocated to this shell are 
>> unavailable to other applications, even if they are just sitting idle while 
>> a developer is getting their environment set up to run a very big job 
>> interactively. Other developers that would like to launch interactive shells 
>> are stuck waiting for the first one to exit their shell.
>> My proposal would eliminate this static nature of core counts and allow as 
>> many simultaneous applications to be running as the cluster memory (still 
>> statically partitioned, at least initially) will allow. Applications could 
>> be configured with a "cpu shares" parameter (just an arbitrary integer 
>> relative only to other applications) which is essentially just passed 
>> through to the linux cgroup cpu.shares setting. Since each executor of an 
>> application on a given worker runs in it's own process/jvm, then that 
>> process could be easily be placed into a cgroup created and dedicated for 
>> that application.
>> Linux cgroups cpu.shares are pretty well documented, but the gist is that 
>> processes competing for cpu time are allocated a percentage of time equal to 
>> their share count as a percentage of all shares in that level of the cgroup 
>> hierarchy. If two applications are both scheduled on the same core with the 
>> same weight, each will get to utilize 50% of the time on that core. This is 
>> all built into the kernel, and the only thing the spark worker has to do is 
>> create a cgroup for each application, set the cpu.shares parameter, and 
>> assign the executors for that application to the new cgroup. If multiple 
>> executors are running on a single worker, for a single application, the cpu 
>> time available to that application is divided  among each of those executors 
>> equally. The default for cpu.shares is that they are not limiting in any 
>> way. A process can consume all available cpu time if it would otherwise be 
>> idle anyway.
> That's the issue that surfaces in google papers: should jobs get idle 
> capacity. Current consensus is "no". Why not? Because you may end up writing 
> an SLA-sensitive app which just happens to meet it's SLAs in times of light 
> cluster load, but precisely when the cluster is busy, it suddenly slows down, 
> leading to stress all round, in the "why is this service suddenly unusable" 
> kind of stress. Instead you keep the cluster busy with low priority 
> preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.
>> Another benefit to passing cpu.shares directly to the kernel (as opposed to 
>> some abstraction) is that cpu share allocations are heterogeneous to all 
>> processes running on a machine. An admin could have very fine grained 
>> control over which processes get priority access to cpu time, depending on 
>> their needs.
>> To continue my personal example above, my long running cpu intensive 
>> application could utilize 100% of all cluster cores if they are idle. Then 
>> my time sensitive app could be launched with nine times the priority and the 
>> linux kernel would scale back the first application to 10% of all cores 
>> (completely seemlessly and automatically: no pre-emption, just fewer time 
>> slices of cpu allocated by the kernel to the first application), while the 
>> second application gets 90% of all the cores until it completes.
> FWIW, it's often memory consumption that's most problematic here. If one 
> process starts to swap, it hurts everything else. But Java isn't that good at 
> handling limited heap/memory size; you have to spec that heap up front. 
>> The only downside that I can think of currently is that this scheduling mode 
>> would create an increase in context switching on each host. This issue is 
>> somewhat mitigated by still statically allocating memory however, since 
>> there wouldn't typically be an exorbitant number of applications running at 
>> once.
>> In my opinion, this would allow the most optimal usage of cluster resources. 
>> Linux cgroups allow you to control access to more than just cpu shares. You 
>> can apply the same concept to other resources (memory, disk io). You can 
>> also set up hard limits so that an application will never get more than is 
>> allocated to it. I know that those limitations are important for some use 
>> cases involving predictability of application execution times. Eventually, 
>> this idea could be expanded to include many more of the features that 
>> cgroups provide.
>> Thanks again for any feedback on this idea. I hope that I have explained it 
>> a bit better now. Does anyone else can see value in it?
> I'm not saying "don't get involved in the scheduling problem"; I'm trying to 
> show just how complex it gets in a large system. Before you begin to write a 
> line of code, I'd recommend
> -you read as much of the published work as you can, including the google and 
> microsoft papers, Facebook's FairScheduler work, etc, etc.
> -have a look at the actual code inside those schedulers whose source is 
> public, that's YARN and Mesos.
> -try using these schedulers for your own workloads.
> really: scheduling work across a datacentre a complex problem that is still 
> considered a place for cutting-edge research. Avoid unless you want to do 
> that.
> -Steve

