Thanks for the response Jörn, This patch is intended only for spark standalone.
My understanding of the YARN cgroup support is that it only limits cpu, rather than allocates it based on the priority or shares system. This could be old documentation that I'm remembering, however. Another issue with YARN is that it has a lot more overhead than standalone mode, and always seemed a bit less responsive in general. Lastly, I remember struggling greatly with yet another resource abstraction layer (as if spark doesn't have enough already), it still statically allocated cores (albeit virtual ones), and it was much more cumbersome to find a proper balance of resources to request for an app. My experience in trying to accomplish something like this in Mesos was always met with frustration because the system still statically allocated cores away to be reserved by individual apps. Trying to adjust the priority of individual applications was only possible by increasing the core count, further starving other apps of available cores. It was impossible to give a priority lower than the default to an app. The cpu.shares parameter was abstracted away as a multiple of the number of requested cores, which had a double down affect on the app: not only was it given more cores, it was also given a higher priority to run on them. Perhaps this has changed in more recent versions, but this was my experience when testing it. I'm not familiar with a spark scheduler for kubernetes, unless you mean to launch a standalone cluster in containers with kubernetes? In that case, this patch would simply divvy up the resources allocated to the spark-worker container among each of it's executors, based on the shares that each executor is given. This is similar to how my current environment works, I'm just not using kubernetes as a container launcher. I found kubernetes was quite limiting in the way we wanted our network to be structured, and it also seemed quite difficult to get new functionality exposed in the form of their yaml API system. My goal with this patch is to essentially eliminate the static allocation of cpu cores at all. Give each app time on the cpu equal to the number of shares it has as a percentage of the total pool. Thanks, Travis ________________________________ From: Jörn Franke <jornfra...@gmail.com> Sent: Thursday, December 15, 2016 12:48 To: Hegner, Travis Cc: Apache Spark Dev Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups. Hi, What about yarn or mesos used in combination with Spark. The have also cgroups. Or a kubernetes etc deployment. On 15 Dec 2016, at 17:37, Hegner, Travis <theg...@trilliumit.com<mailto:theg...@trilliumit.com>> wrote: Hello Spark Devs, I have finally completed a mostly working proof of concept. I do not want to create a pull request for this code, as I don't believe it's production worthy at the moment. My intent is to better communicate what I'd like to accomplish. Please review the following patch: https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler. What the code does: Currently, it exposes two options "spark.cgroups.enabled", which defaults to false, and "spark.executor.shares" which defaults to None. When cgroups mode is enabled, a single executor is created on each worker, with access to all cores. The worker will create a parent cpu cgroup (on first executor launch) called "spark-worker" to house any executors that it launches. Each executor is put into it's own cgroup named with the app id, under the parent cgroup. The cpu.shares parameter is set to the value in "spark.executor.shares", if this is "None", it inherits the value from the parent cgroup. Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to change this code base, but it looks like the kernel interface is the same for the most part. I was able to launch a spark shell which consumed all cores in the cluster, but sat idle. I was then able to launch an application (client deploy-mode) which was also allocated all cores in the cluster, and ran to completion unhindered. Each of the executors on each worker was properly placed into it's respective cgroup, which in turn had the correct cpu.shares value allocated. What the code still needs: * Documentation (assuming the community moves forward with some kind of implementation) * Sometimes the cgroups get destroyed after app completion, sometimes they don't. (need to put `.destroy()` call in a `finally` block., or in the `maybeCleanupApplication()` method; what do you think?) * Proper handling of drivers's resources when running `--deploy-mode cluster` * Better web UI indication of cgroup mode or core sharing (currently just shows up as an over allocation of cores per worker) * Better environment/os/platform detection and testing (I won't be surprised if there is something broken if trying to run this on a different OS) * Security/permissions for cgroups if running worker as non-root (perhaps creating the parent cgroup with correct permissions before launching the worker is all that is necessary) - running the worker in a container currently requires --privileged mode (I haven't figured out if/what capability makes cgroups writable, or if it's possible to use a new cgroup mount point) * More user defined options - cgroup root path (currently hard coded) - driver cpu.shares (for cluster deploy-mode: would require a specially named cgroup... s"$appId-driver" ? default same #shares as executor? default double shares? - parent cpu.shares (currently os default) - parent cgroup name (currently hard coded) I tried to structure the initial concept to make it easy to add support for more cgroup features (cpuset, mem, etc...), should the community feel there is value in adding them. Linux cgroups are an extremely powerful resource allocation and isolation tool, and this patch is only scratching the surface of their general capabilities. Of course, as Mr. Loughran's points out, expanding into these features will require more code maintenance, but not enough that we should shy away from it. <opinion> I personally believe that any multi-node resource allocation system should offload as much of the scheduling and resource allocation as possible to the underlying kernel within the node level. Each node's own kernel is the best equipped place to manage those resources. Only the node's kernel can allocate a few seconds worth of cpu to the low priority app, while the high priority app is waiting on disk I/O, and instantly give it back to the high priority app when it needs it, with (near) real-time granularity The multi-node system should set up a proper framework to give each node's kernel the information it needs to allocate the resources correctly. Naturally, the system should allow resource reservations, and even limits, for the purposes of meeting and testing for SLAs and worst case scenarios as well. Linux cgroups are capable of doing those things in a near real-time fashion. With a proper convention of priorities/shares for applications within an organization, I believe that everyone can get better throughput out of their hardware, at any cluster size. But, alas, *that* is not a problem I'm trying to solve currently. </opinion> Sorry that the patch is pretty rough still, as I'm still getting my head wrapped around spark's code base structure. Looking forward to any feedback. Thanks, Travis ________________________________ From: Hegner, Travis <theg...@trilliumit.com<mailto:theg...@trilliumit.com>> Sent: Tuesday, December 6, 2016 10:49 To: Steve Loughran Cc: Shuai Lin; Apache Spark Dev Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups. Steve, I appreciate your experience and insight when dealing with large clusters at the data-center scale. I'm also well aware of the complex nature of schedulers, and that it is an area of ongoing research being done by people/companies with many more resources than I have. This might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this exact kind of debate over what I want to accomplish. This is also why I mentioned that this idea will mostly benefit users with small clusters. I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the main thing that they have in common is that they don't work well for my use case. Those systems are designed for large scale 1000+ node clusters, and become painful to manage in the small cluster range. Most of the tools that we've attempted to use don't work well for us, so we've written several of our own: https://github.com/trilliumit/. It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not trying to solve the problem that you are referring to*. We are operating at a level that if we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine that we are not alone in that fact either, I've seen that many of the questions on SO and on the user list are from others operating at a level similar to ours. I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems typically handle it gracefully. However, if idle CPU is expensive, then how much more does wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately for me, that isn't a problem that I have to solve at the moment. >Instead? Use a multi-user cluster scheduler and spin up different spark >instances for the different workloads See my above comment on how well these cluster schedulers work for us. I have considered the avenue of multiple spark clusters, and in reality the infrastructure we have set up would allow me to do this relatively easily. In fact, in my environment, this is a similar solution to what I'm proposing, just managed one layer up the stack and with less flexibility. I am trying to avoid this solution however because it does require more overhead and maintenance. What if I want two spark apps to run on the same cluster at the same time, sharing the available CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing as many spark clusters as I feel like launching. >FWIW, it's often memory consumption that's most problematic here. Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my initial proposal is net yet changing the allocation of memory as a resource. This would still be statically allocated in a FIFO manner as long as memory is available on the cluster, the same way it is now. >I would strongly encourage you to avoid this topic Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage that instead. Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are allocated and launched, so that I can more intuitively and more optimally utilize my small spark cluster. Thanks, Travis ________________________________ From: Steve Loughran <ste...@hortonworks.com<mailto:ste...@hortonworks.com>> Sent: Tuesday, December 6, 2016 06:54 To: Hegner, Travis Cc: Shuai Lin; Apache Spark Dev Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups. This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react. cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be. I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on. There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution. Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*. -Steve On 5 Dec 2016, at 15:36, Hegner, Travis <theg...@trilliumit.com<mailto:theg...@trilliumit.com>> wrote: My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve. You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster. I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run. You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell. My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application. Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway. That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc. Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs. To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes. FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front. The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once. In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide. Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it? I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend -you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc. -have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos. -try using these schedulers for your own workloads. really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that. -Steve