Sorry. Let me share Kafka benchmark results again. | # of topics | Kafka TPS | | 50 | 34,488 | | 100 | 34,502 | | 200 | 31,781 | | 500 | 30,324 | | 1000 | 30,855 |
Best regards Dominic 2018-06-07 2:04 GMT+09:00 Dominic Kim <[email protected]>: > Sorry for the late response Tyson. > > Let me first answer your second question. > Vuser is just the number of threads to send the requests. > Each Vusers randomly picked the namespace and send the request using REST > API. > So they are independent of the number of namespaces. > > And regarding performance degradation on the number of users, I think it > works a little bit differently. > Even though I have only 2 users(namespaces), if their homeInvoker is same, > TPS become very less. > So it is a matter of the number of actions whose homeInvoker are same > though more the number of users than the number of containers can harm the > performance. > This is because controller should send those actions to the same invoker > even though there are other idle invokers. > In my proposal, controllers can schedule activation to any invokers so it > does not happen. > > > And regarding the issue about the sheer number of Kafka topics, let me > share my idea. > > 1. Data size is not changed. > > If we have 1000 activation requests, they will be spread among invoker > topics. Let's say we have 10 invokers, then ideally each topic will have > 100 messages. > In my proposal, if I have 10 actions, each topic will have 100 messages as > well. > Surely there will be more number of actions than the number of invokers, > data will be spread to more topics, but data size is unchanged. > > 2. Data size depends on the number of active actions. > > For example, if we have one million actions, in turn, one million topics > in Kafka. > If only half of them are executed, then there will be data only for half > of them. > For rest half of topics, there will be no data and they won't affect the > performance. > > 3. Things to concern. > > Let me describe what happens if there are more number of Kafka topics. > > Let's say there are 3 invokers with 5 activations each in the current > implementation, then it would look like this. > > invoker0: 0 1 2 3 4 5 (5 messages) -> consumer0 > invoker1: 0 1 2 3 4 5 -> consumer1 > invoekr2: 0 1 2 3 4 5 -> consumer2 > > Now If I have 15 actions with 15 topics in my proposal. > > action0: 0 -> consumer0 > action1: 0 -> consumer1 > action2: 0 -> consumer2 > action3: 0 -> consumer3 > . > . > . > action14: 0 -> consumer14 > > Kafka utilizes page cache to maximize the performance. > Since the size of data is not changed, data kept in page cache is also not > changed. > But the number of parallel access to data is increased. I think it might > be some overhead. > > That's the reason why I performed benchmark with multiple topics. > > # of topics > > Kafka TPS > > 50 > > 34,488 > > 100 > > 34,502 > > 200 > > 31,781 > > 500 > > 30,324 > > 1,000 > > 30,855 > > As you can see there are some overheads from increased parallel access to > data. > Here we can see about 4,000 TPS degraded as the number of topics increased. > > But we can still support 30K TPS with 1000 topics using 3 Kafka nodes. > If we need more TPS we can just add more nodes. > Since Kafka can horizontally scale-out, if we add 3 more servers, I expect > we can get 60K TPS. > > Partitions in Kafka are evenly distributed among nodes in a cluster. > Each nodes becomes a leader for each partition. If we have 100 partitions > with 4 Kafka nodes, ideally each Kafka nodes will be a leader for 25 > partitions. > Then consumers can directly read messages from different partition leaders. > This is why Kafka can horizontally scale-out. > > Even though the number of topics is increased, if we add more Kafka nodes, > the number of partitions which is managed by one Kafka node would be > unchanged. > So if we can support 30K TPS with 1000 topics using 3 nodes, then we can > still get 60K TPS with 2000 topics using 6 nodes. > Similarly, if we have enough Kafka nodes, the number of partitions in one > Kafka nodes will be same though we have one million concurrent invocations. > > This is what I am thinking. > If I miss anything, kindly let me know. > > Thanks > Regards > Dominic. > > > > > > 2018-05-26 13:22 GMT+09:00 Tyson Norris <[email protected]>: > >> Hi Dominic - >> >> Thanks for the detailed presentation! It is helpful to go through to >> understand your points - well done. >> >> >> A couple of comments: >> >> - I'm not sure how unbounded topic (and partition) growth will be >> handled, realistically. AFAIK, these are not free of resource consumption >> at the client or. >> >> - In your test it looks like you have 990 vusers example (pdf page 121) - >> are these using different namespaces? I ask because I think the current >> impl isolates the container per namespace, so if you are limited to 180 >> containers, I can see how there will be container removal/restarts in the >> case where the number of users greatly outnumbers the number of containers >> - I'm not sure if the test behaves this way, or your "New implementation" >> behaves similar? (does a container get reused for many different >> namespaces?) >> >> >> I'm interested to know if there are any kafka experts here that can >> provide more comments on the topics/partition handling question? I will >> also ask for some additional feedback from other colleagues. >> >> >> I will gather some more comments to share, but wanted to start off with >> these. Will continue next week after the long (US holiday) weekend. >> >> >> Thanks >> >> Tyson >> >> >> ________________________________ >> From: Dominic Kim <[email protected]> >> Sent: Friday, May 25, 2018 1:58:55 AM >> To: [email protected] >> Subject: Re: New scheduling algorithm proposal. >> >> Dear Whiskers. >> >> I uploaded the material that I used to give a speech at last biweekly >> meeting. >> >> https://na01.safelinks.protection.outlook.com/?url=https%3A% >> 2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FOPENW >> HISK%2FAutonomous%2BContainer%2BScheduling&data=02%7C01% >> 7Ctnorris%40adobe.com%7C0c84bff555fb4990142708d5c21dc5e8%7Cf >> a7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636628355491109416 >> &sdata=qppUpTRm%2BkTR5EueiLg7Ix6xiGWlzk5WDn3DxJv032w%3D&reserved=0 >> >> This document mainly describes following things: >> >> 1. Current implementation details >> 2. Potential issues with current implementation >> 3. New scheduling algorithm proposal: Autonomous Container Scheduling >> 4. Review previous issues with new scheduling algorithm >> 5. Pros and cons of new scheduling algorithm >> 6. Performance evaluation with prototyping >> >> >> I hope this is helpful for many Whiskers to understand the issues and new >> proposal. >> >> Thanks >> Best regards >> Dominic >> >> >> 2018-05-18 18:44 GMT+09:00 Dominic Kim <[email protected]>: >> >> > Hi Tyson >> > Thank you for comments. >> > >> > First, total data size in Kafka would not be changed, since the number >> of >> > activation requests will be same though we activate more topics. >> > Same amount of data will be just split into different topics, so there >> > would be no need for more disk space in Kafka. >> > But it will increase parallel processing at Kafka layer, more number of >> > consumers will access to Kafka at the same time. >> > >> > The reason why active/inactive is meaningful in the scene is, the size >> of >> > parallel processing will be dependent on it. >> > Though we have 100k and more topics(actions), if only 10 actions are >> being >> > invoked, there will be only 10 parallel processing. >> > The number of inactive topics does not affect the performance of active >> > consumers. >> > If we really need to support 100k parallel action invocation, surely we >> > need more nodes to handle them not only for just Kafka. >> > Kafka can horizontally scale out and number of active topics at some >> point >> > will always be lesser than the total number of topics, Based on my >> > benchmark results, I expect it is enough to take with scale-out of >> Kafka. >> > >> > Regarding topic cleanup, we don't need to clean them up by ourselves, >> > Kafka will clean up expired data based on retention configuration. >> > So if the topic is no more activated(the action is no more invoked), >> there >> > would be no actual data though the topic exists. >> > And as I said, even if data exists for some topics, they won't affect >> > other active consumers if they are not activated. >> > >> > Regarding concurrent activation PR, it is very worthy change. And I >> > recognize it is orthogonal to my PR. >> > It will not only alleviate current performance issue but can be used >> with >> > my changes as well. >> > >> > In current logics, controller schedule activations based on hash, your >> PR >> > would be much effective if some changes are made on scheduling logic. >> > >> > Regarding bypassing kafka, I am inclined to use Kafka because it can act >> > as a kind of buffer. >> > If some activations are not processed due to some reasons such as lack >> of >> > resources or invoker failure and so on, Kafka can keep them up for some >> > times and guarantee `at most once` invocation though invocation might >> be a >> > bit delayed. >> > >> > With regard to combined approach, I think that is great idea. >> > For that, container states should be shared among invokers and they can >> > send activation request to any containers(local, or remote). >> > As so invokers will utilize warmed resources which are not located in >> its >> > local. >> > >> > But it will also introduce some synchronization issue among controllers >> > and invokers or it needs segregation between resources based scheduling >> at >> > controller and real invocation. >> > In the earlier case, since controller will schedule activations based on >> > resources status, it is required to synchronize them in realtime. >> > Invokers can send requests to any remote containers, there will be >> > mismatch in resource status between controllers and invokers. >> > >> > In the later case, controller should be able to send requests to any >> > invokers then invoker will schedule the activations. >> > In this case also, invokers need to synchronize their container status >> > among them. >> > >> > Under the situation all invokers have same resources status, if two >> > invokers received same action invocation requests, it's not easy to >> control >> > the traffic among them, because they will schedule requests to same >> > containers. And if we take similar approach with what you suggested, to >> > send intent to use the containers first, it will introduce increasing >> > latency overhead as more and more invokers joined the cluster. >> > I couldn't find any good way to handle this yet. And this is why I >> > proposed autonomous containerProxy to enable location free scheduling. >> > >> > Finally regarding SPI, yes you are correct, ContainerProxy is highly >> > dependent on ContainerPool, I will update my PR as you guided. >> > >> > Thanks >> > Regards >> > Dominic. >> > >> > >> > 2018-05-18 2:22 GMT+09:00 Tyson Norris <[email protected]>: >> > >> >> Hi Dominic - >> >> >> >> I share similar concerns about an unbounded number of topics, despite >> >> testing with 10k topics. I’m not sure a topic being considered active >> vs >> >> inactive makes a difference from broker/consumer perspective? I think >> there >> >> would minimally have to be some topic cleanup that happens, and I’m not >> >> sure the impact of deleting topics in bulk will have on the system >> either. >> >> >> >> A couple of tangent notes related to container reuse to improve >> >> performance: >> >> - I’m putting together the concurrent activation PR[1] (to allow reuse >> of >> >> an warm container for multiple activations concurrently); this can >> improve >> >> throughput for those actions that can tolerate it (FYI per-action >> config is >> >> not implemented yet). It suffers from similar inaccuracy of kafka >> message >> >> ingestion at invoker “how many messages should I read”? But I think we >> can >> >> tune this a bit by adding some intelligence to Invoker/MessageFeed >> like “if >> >> I never see ContainerPool indicate it is busy, read more next time” - >> that >> >> is, allow ContainerPool to backpressure MessageFeed based on ability to >> >> consume, and not (as today) strictly on consume+process. >> >> >> >> - Another variant we are investigating is putting a ContainerPool into >> >> Controller. This will prevent container reuse across controllers >> (bad!), >> >> but will bypass kafka(good!). I think this will be plausible for >> actions >> >> that support concurrency, and may be useful for anything that runs as >> >> blocking to improve a few ms of latency, but I’m not sure of all the >> >> ramifications yet. >> >> >> >> >> >> Another (more far out) approach combines some of these is changing the >> >> “scheduling” concept to be more resource reservation and garbage >> >> collection. Specifically that the ContainerPool could be a combination >> of >> >> self-managed resources AND remote managed resources. If no proper >> (warm) >> >> container exists locally or remotely, a self-managed one is created, >> and >> >> advertised. Other ContainerPool instances can leverage the remote >> resources >> >> (containers). To pause or remove a container requires advertising >> intent to >> >> change state, and giving clients time to veto. So there is some added >> >> complication in the start/reserver/pause/rm container lifecycle, but >> the >> >> case for reuse is maximized in best case scenario (concurrency tolerant >> >> actions) and concurrency intolerant actions have a chance to leverage a >> >> broader pool of containers (iff the ability to reserve a shared >> available >> >> container is fast enough, compared to starting a new cold one). There >> is a >> >> lot wrapped in there (how are resources advertised, what are the new >> states >> >> of lifecycle, etc), so take this idea with a grain of salt. >> >> >> >> >> >> Specific to your PR: do you need an SPI for ContainerProxy? Or can it >> >> just be designated by the ContainerPool impl to use a specific >> >> ContainerProxy variant? I think these are now and will continue to be >> tied >> >> closely together, so would manage them as a single SPI. >> >> >> >> Thanks >> >> Tyson >> >> >> >> [1] https://na01.safelinks.protection.outlook.com/?url=https%3A% >> 2F%2Fgithub.com%2Fapache%2Fincubator-openwhisk%2Fpull% >> 2F2795&data=02%7C01%7Ctnorris%40adobe.com%7C0c84bff555fb4990 >> 142708d5c21dc5e8%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0% >> 7C636628355491109416&sdata=WsPtzIMCpQsGQDML8n1Jm%2BbsBj8BWhw >> IJ%2B9wiQjkQOE%3D&reserved=0<http >> >> s://github.com/apache/incubator-openwhisk/pull/2795#pullrequ >> >> estreview-115830270> >> >> >> >> On May 16, 2018, at 10:42 PM, Dominic Kim <[email protected]<mailto: >> st >> >> [email protected]>> wrote: >> >> >> >> Dear all. >> >> >> >> Does anyone have any comments on this? >> >> Any comments or opinions would be greatly welcome : ) >> >> >> >> I think we need around following changes to take this in. >> >> >> >> 1. SPI supports for ContainerProxy and ContainerPool ( I already >> opened PR >> >> for this: https://na01.safelinks.protection.outlook.com/?url=https%3A% >> >> 2F%2Fgithub.com%2Fapache%2Fincubator-openwhisk%2Fpull% >> >> 2F3663&data=02%7C01%7Ctnorris%40adobe.com%7Cb2dd7c5686bc466f >> >> 92f708d5bbb8f43d%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0% >> >> 7C636621325405375234&sdata=KYHyOGosc%2BqMIFLVVRO2gYCYZRegfQL >> >> l%2BfGjSvtGI9k%3D&reserved=0) >> >> 2. SPI supports for Throttling/Entitlement logics >> >> 3. New loadbalancer >> >> 4. New ContainerPool and ContainerProxy >> >> 5. New notions for limit and logics to hanlde more fine-grained >> limit.(It >> >> should be able to coexist with existing limit) >> >> >> >> If there's no any comments, I will open a PR based on it one by one. >> >> Then it would be better for us to discuss on this because we can >> directly >> >> discuss over basic implementation. >> >> >> >> Thanks >> >> Regards >> >> Dominic. >> >> >> >> >> >> >> >> 2018-05-09 11:42 GMT+09:00 Dominic Kim <[email protected]<mailto:st >> >> [email protected]>>: >> >> >> >> One more thing to clarify is invoker parallelism. >> >> >> >> ## Invoker coordinates all requests. >> >> >> >> I tend to disagree with the "cannot take advantage of parallel >> >> processing" bit. Everything in the invoker is parallelized after >> updating >> >> its central state (which should take a **very** short amount of time >> >> relative to actual action runtime). It is not really optimized to >> scale to >> >> a lot of containers *yet*. >> >> >> >> More precisely, I think it is related to Kafka consumer rather than >> >> invoker. Invoker logic can run in parallel. But `MessageFeed` seems >> not. >> >> Once outstanding message size reaches max size, it will wait for >> messages >> >> processed. If few activation messages for an action are not properly >> >> handled, `MessageFeed` does not consume more message from Kafka(until >> any >> >> messages are processed). >> >> So subsequent messages for other actions cannot be fetched or delayed >> due >> >> to unprocessed messages. This is why I mentioned invoker parallelism. I >> >> think I should rephrase it as `MessageFeed` parallelism. >> >> >> >> As you know partition is unit of parallelism in Kafka. If we have >> multiple >> >> partitions for activation topic, we can setup multiple consumers and it >> >> will enable parallel processing for Kafka messages as well. >> >> Since logics in invoker can already run in parallel, with this change, >> we >> >> can process messages entirely in parallel. >> >> >> >> In my proposal, I split activation messages from container coordination >> >> message(action parallelism), assign more partition for activation >> >> messages(in-topic parallelism) and enable parallel processing with >> >> multiple >> >> consumers(containers). >> >> >> >> >> >> Thanks >> >> Regards >> >> Dominic. >> >> >> >> >> >> >> >> >> >> >> >> 2018-05-08 19:34 GMT+09:00 Dominic Kim <[email protected]<mailto:st >> >> [email protected]>>: >> >> >> >> Thank you for the response Markus and Christian. >> >> >> >> Yes I agree that we need to discuss this proposal in abstract way >> instead >> >> in conjunction it with any specific technology because we can take >> better >> >> software stack if possible. >> >> Let me answer your questions line by line. >> >> >> >> >> >> ## Does not wait for previous run. >> >> >> >> Yes it is valid thoughts. If we keep cumulating requests in the queue, >> >> latency can be spiky especially in case execution time of action is >> huge. >> >> So if we want to take this in, we need to find proper way to balance >> >> creating more containers for latency and making existing containers >> handle >> >> requests. >> >> >> >> >> >> ## Not able to accurately control concurrent invocation. >> >> >> >> Ok I originally thought this is related to concurrent containers rather >> >> than concurrent activations. >> >> But I am still inclined to concurrent containers approach. >> >> In current logic, it is dependent on factors other than real concurrent >> >> invocations. >> >> If RTT between controllers and invokers becomes high for some reasons, >> >> controller will reject new requests though invokers are actually idle. >> >> >> >> ## TPS is not deterministic. >> >> >> >> I meant not deterministic TPS for just one user rather I meant >> >> system-wide deterministic TPS. >> >> Surely TPS can vary when heterogenous actions(which have different >> >> execution time) are invoked. >> >> But currently it's not easy to figure out what the TPS is with only 1 >> >> kind of action because it is changed based on not only heterogeneity of >> >> actions but the number of users and namespaces. >> >> >> >> I think at least we need to be able to have this kind of official spec: >> >> In case actions with 20 ms execution time are invoked, our system TPS >> is >> >> 20,000 TPS(no matter how many users or namespaces are used). >> >> >> >> >> >> Your understanding about my proposal is perfectly correct. >> >> Small thing to add is, controller sends `ContainerCreation` request >> based >> >> on processing speed of containers rather than availability of existing >> >> containers. >> >> >> >> BTW, regarding your concern about Kafka topic, I think we may be fine >> >> because, >> >> the number of topics will be unbounded, but the number of active topics >> >> will be bounded. >> >> >> >> If we take this approach, it is mandatory to limit retention bytes and >> >> duration for each topics. >> >> So the number of active topics is limited and actual data in them are >> >> also limited, so I think that would be fine. >> >> >> >> But it is necessary to have optimal configurations for retention and >> many >> >> benchmark to confirm this. >> >> >> >> >> >> And I didn't get the meaning of eventual consistency of consumer lag. >> >> You meant that is eventual consistent because it changes very quickly >> >> even within one second? >> >> >> >> >> >> Thanks >> >> Regards >> >> Dominic >> >> >> >> >> >> >> >> 2018-05-08 17:25 GMT+09:00 Markus Thoemmes <[email protected] >> <ma >> >> ilto:[email protected]>>: >> >> >> >> Hey Dominic, >> >> >> >> Thank you for the very detailed writeup. Since there is a lot in here, >> >> please allow me to rephrase some of your proposals to see if I >> understood >> >> correctly. I'll go through point-by-point to try to keep it close to >> your >> >> proposal. >> >> >> >> **Note:** This is a result of an extensive discussion of Christian >> >> Bickel (@cbickel) and myself on this proposal. I used "I" throughout >> the >> >> writeup for easier readability, but all of it can be read as "we". >> >> >> >> # Issues: >> >> >> >> ## Interventions of actions. >> >> >> >> That's a valid concern when using today's loadbalancer. This is >> >> noisy-neighbor behavior that can happen today under the circumstances >> you >> >> describe. >> >> >> >> ## Does not wait for previous run. >> >> >> >> True as well today. The algorithms used until today value correctness >> >> over performance. You're right, that you could track the expected queue >> >> occupation and schedule accordingly. That does have its own risks >> though >> >> (what if your action has very spiky latency behavior?). >> >> >> >> I'd generally propose to break this out into a seperate discussion. It >> >> doesn't really correlate to the other points, WDYT? >> >> >> >> ## Invoker coordinates all requests. >> >> >> >> I tend to disagree with the "cannot take advantage of parallel >> >> processing" bit. Everything in the invoker is parallelized after >> updating >> >> its central state (which should take a **very** short amount of time >> >> relative to actual action runtime). It is not really optimized to >> scale to >> >> a lot of containers *yet*. >> >> >> >> ## Not able to accurately control concurrent invocation. >> >> >> >> Well, the limits are "concurrent actions in the system". You should be >> >> able to get 5 activations on the queue with today's mechanism. You >> should >> >> get as many containers as needed to handle your load. For very >> >> short-running actions, you might not need N containers to handle N >> >> messages >> >> in the queue. >> >> >> >> ## TPS is not deterministic. >> >> >> >> I'm wondering: Have TPS been deterministic for just one user? I'd argue >> >> that this is a valid metric on its own kind. I agree that these numbers >> >> can >> >> drop significantly under heterogeneous load. >> >> >> >> # Proposal: >> >> >> >> I'll try to rephrase and add some bits of abstraction here and there to >> >> see if I understood this correctly: >> >> >> >> The controller should schedule based on individual actions. It should >> >> not send those to an arbitrary invoker but rather to something that >> >> identifies those actions themselves (a kafka topic in your example). >> I'll >> >> call this *PerActionContainerPool*. Those calls from the controller >> will >> >> be >> >> handled by each *ContainerProxy* directly rather than being threaded >> >> through another "centralized" component (the invoker). The >> >> *ContainerProxy* >> >> is responsible for handling the "aftermath": Writing activation >> records, >> >> collecting logs etc (like today). >> >> >> >> Iff the controller thinks that the existing containers cannot sustain >> >> the load (i.e. if all containers are currently in use), it advises a >> >> *ContainerCreationSystem* (all invokers combined in your case) to >> create a >> >> new container. This container will be added to the >> >> *PerActionContainerPool*. >> >> >> >> The invoker in your proposal has no scheduling logic at all (which is >> >> sound with the issues lined out above) other than container creation >> >> itself. >> >> >> >> # Conclusion: >> >> >> >> I like the proposal in the abstract way I've tried to phrase above. It >> >> indeed amplifies warm-container usage and in general should be >> superior to >> >> the more statistical approach of today's loadbalancer. >> >> >> >> I think we should discuss this proposal in an abstract, >> >> non-technology-bound way. I do think that having so many kafka topics >> >> including all the rebalancing needed can become an issue, especially >> >> because the sheer number of kafka topics is unbounded. I also think >> that >> >> the consumer lag is subject to eventual consistency and depending on >> how >> >> eventual that is it can turn into queueing in your system, even though >> >> that >> >> wouldn't be necessary from a capacity perspective. >> >> >> >> I don't want to ditch the proposal because of those concerns though! >> >> >> >> As I said: The proposal itself makes a lot of sense and I like it a >> lot! >> >> Let's not trap ourselves in the technology used today though. You're >> >> proposing a major restructuring so we might as well think more >> >> green-fieldy. WDYT? >> >> >> >> Cheers, >> >> Christian and Markus >> >> >> >> >> >> >> >> >> >> >> >> >> > >> > >
