errose28 commented on code in PR #7583: URL: https://github.com/apache/ozone/pull/7583#discussion_r1940247445
########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,477 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 request (on average) at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing with prototype for this feature pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: Currently, ratis serialize all the execution in order. With this new feature, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Currently, Locks are taken at bucket level for both read and write flow. With this new feature, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Currently, Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. With this new feature, its planned to remove this Cache. +4. Double buffer code complexity: Currently, Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. With this new feature, its planned to remove Double Buffer. +5. Request execution flow optimization: With new feature, its planned to optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With this new feature, its going to add parallelism in execution, and will improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client retry / replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All requests executed in parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +Batching of Request: +- Request 1..n are executed and db changes are identified and added to queue (and request will be waiting for update via ratis over Future waiting) +- Batcher will retrieve Request 1..n and db changes, merge those request to single Ratis Request message +- Send Merged Request message to all nodes via ratis and receive reply +- Batcher will reply to each request 1..n with db update success notifying future object of each request. + +There are multiple batchers waiting over queue, +- As soon as queue have entry, and the batcher is available, it will pick all request from queue for processing +- batcher will be un-available when its processing the batch, i.e. merge request and send to ratis and then waiting for reply + +As performance Test result, Number of batcher with "5->8" performed the best. +- Higher number of batcher reduces the effective batching, and performance reduces +- Lower number of batcher reduces throughput as more request will be waiting for ratis response + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. Review Comment: Are you referencing HDFS here with in memory inodes and journal nodes? Synchronizing on DB changes is the way consensus usually works IME. Synchronizing on abstract commands like we do currently is very error prone because the critical applyTransaction step is unique for each command. If in-memory state like caches need to be updated that would be considered part of the state machine and updated on applyTransaction as well. The separation proposed here does make this type of write-through caching more difficult because the applyTransaction entries are already serialized. This should be called out in the doc and tested as well. Are things like a full cache of the bucket table that saves the deserialization step actually helping read performance of can we drop them? If they are needed, there would likely need to be some sort of caching support added: - The [module](https://github.com/apache/ozone/pull/7583#discussion_r1911800743) would support callbacks to update a cache on applyTransaction in an async manner. - Caching could be applied at the column family level, similar to what we currently have. - Cache misses would read through to the DB. - Cache update failures would not be fatal since the value is already in the DB. - This prevents cases like we currently have where failures in applyTransaction unique to specific requests crash the OM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
