errose28 commented on code in PR #7583: URL: https://github.com/apache/ozone/pull/7583#discussion_r1911734977
########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) Review Comment: There's another step after this that needs to specified: that we don't return success to the client until the apply transaction of their request has completed on the leader ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. Review Comment: The current phrasing does not make it clear that these are things this feature aims to remove. The other items listed are things it is going to add or improve. ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. +- there will not be any double buffer and all changes will be flushed to db immediately. +- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node. + +## Description + +### Index generation + +In old flow, ratis index is used for `object Id` of key and `update Id` for key update. +For new flow, it will not depend on ratis index, but will have its own **`managed index`**. + +Index initialization / update: +- First time startup: 0 +- On restart (leader): last preserved index + 1 +- On Switch over: last index + 1 +- Request execution: index + 1 +- Upgrade: Last Ratis index + 1 + + +#### Index Persistence: + +Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX" +Format: <timestamp>#<index> +Time stamp: This will be used to identify last saved transaction executed +Index: index identifier of the request + +Sync the Index to other nodes: +Special request body having metadata: [Execution Control Message](leader-execution.md#control-request). + + +#### Step-by-step incremental changes for existing flow + +1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow. +2. objectId generation: need follow old logic of index to objectId mapping. Review Comment: These steps aren't clear to me. This section also needs to cover update ID handling. ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. +- there will not be any double buffer and all changes will be flushed to db immediately. +- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node. + +## Description + +### Index generation + +In old flow, ratis index is used for `object Id` of key and `update Id` for key update. +For new flow, it will not depend on ratis index, but will have its own **`managed index`**. + +Index initialization / update: +- First time startup: 0 +- On restart (leader): last preserved index + 1 +- On Switch over: last index + 1 +- Request execution: index + 1 +- Upgrade: Last Ratis index + 1 + + +#### Index Persistence: + +Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX" +Format: <timestamp>#<index> +Time stamp: This will be used to identify last saved transaction executed +Index: index identifier of the request + +Sync the Index to other nodes: +Special request body having metadata: [Execution Control Message](leader-execution.md#control-request). + + +#### Step-by-step incremental changes for existing flow + +1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow. +2. objectId generation: need follow old logic of index to objectId mapping. + +### No-Cache for write operation + +In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. +Since DB changes is done in batches, so a cache is maintained till flush of DB is completed. Cache is maintained so that OM can serve further request till flush is completed. + +This adds complexity during read for the keys, as it needs ensure to have the latest data from cache or DB. +Since there can be parallel operation of adding keys to cache, removal from cache and flush to db, this induces bug to the code if this is not handled properly. + +For new flow, partial table cache is removed, and changes are visible as soon as changes are flushed to db. +For this to achieve, +- granular locking for key operation to avoid parallel update till the existing operation completes. This avoids need of cache as data is available only after changes are persisted. +- Double buffer operation removal for the flow, flush is done immediately before response is returned. This is no more needed as no need to serve next request as current reply is not done. +- Bucket resource is handled in such a way that its visible only after db changes are flushed. This is required as quota is shared between different keys operating parallel. +Note: For incremental changes, quota count will be available immediately for read for compatibility with older flow till all flows are migrated to new flow. + +### Quota handling (Bucket Quota) + +Earlier, bucket level lock is taken, quota validation is performed and updated with-in lock in cache in all nodes. +During startup before persistence to db, request is re-executed from ratis log and bucket quota cache is prepared again. +So once bucket quota is updated in cache, it will remain same (As recovered during startup with same re-execution). + +Now request is getting executed at leader node, so bucket case will not be able to recover if crash happens. So it can be updated in BucketTable cache only after its persisted. + + + +For bucket quota in new flow, +- When processing key commit, the quota will be `reserved` at leader. +- Bucket quota changes will be distributed to all other nodes via ratis +- At all nodes, key changes is flushed to DB, during that time, quota change will be updated to BucketTable, and quota reserve will be reset. +- On failure, reserve quota for the request will be reset. + +`Bucket Resource Quota` will store quota information with respect to `index` also and same will be used to reset on request handling, +- At leader node after request is send to ratis in success and failure path (as default always) with `request index` +- At all nodes on apply transaction, quota is reset with request index. + So in all cases, reserved quota can be removed in processing of request. + +Cases: +1. Quota is reserved at leader node but sync to other nodes fail, quota will be reset always +2. Quota is updated at leader node in apply transaction, it will reset quota to avoid double quota increase +3. Quota is updated at follower node in apply transaction, reset as no impact as `Bucket Quota resource` will not have any entry for the the request + +### Granular locking +Gateway: Perform lock as per below strategy for OBS/FSO +On lock success, trigger execution of request to respective executor queue + +#### OBS Locking +refer [OBS locking](obs-locking.md) + +#### FSO Locking +TODO + +Challenges compared to OBS, +1. Implicit directory creation +2. file Id depends on parent directory /<volId>/<bucketId>/<parent ObjectId>/<file name> +So due to hierarchy in nature and parallel operation at various level, FSO locking is more complicated. + +#### Legacy Locking: +Not-in-scope + +### Optimized new flow + +Currently, a request is handled as: +- Pre-execute: does request static validation, authorization +- validateAndUpdateCache: locking, handle request, update cache +- Double buffer to update DB using cache happening in background + +Request execution Template: every request handling need follow below template of request execution. + +- preProcess: basic request validation, update parameter like user info, normalization of key +- authorize: perform ranger or native acl validation +- lock: granular level locking +- unlock: unlock locked keys +- process: process request like: + - Validation after lock like bucket details + - Retrieve previous key, create new key, quota update, and so on + - Record changes for db update + - Prepare response + - Audit and logging + - Metrics update +- Request validator annotation: similar to existing, where compatibility check with ozone manager feature version and client feature version, and update request to support compatibility if any. + +Detailed request processing: +OBS: +- [Create key](request/obs-create-key.md) +- [Commit key](request/obs-commit-key.md) + +### Execution persist and distribution Review Comment: I think this whole section needs to be redesigned. In theory, Ratis + RocksDB should be able to exist in its own module as a replicated DB with no dependencies on anything Ozone specific. We will need this eventually to bring the same code flow to SCM (for rolling upgrade) and Recon (for non-voting follower) without rewriting these critical pieces that deal with replication and persistence. Actually moving the code to separate modules may be outside the scope of this feature, but we need to define the API surface such that it is possible to avoid having to rewrite/refactor what is soon to be already new code. For this example I will refer to the replicated DB as its own module, even if V1 of the code does not structure it this way for migration purposes. It is the API surface used by each request that is more important to lock down now. Input to this module should be of the form of protos that define the DB updates to perform. The actual values written to the DB should already have been serialized to bytes by this point and they should not be deserialized at any point later in the flow (with the exception of merges). This means the module has no knowledge of client ID, quota info, etc. We would have one proto message defining each operation supported by the DB. The module takes one `Batch` which contains these operations and will be treated as one Ratis request ``` message Put { optional byte key optional byte value } message Delete { optional byte key } message Merge { optional byte key optional byte value } message Checkpoint { // Path to place the checkpoint optional string destination } // Only one field should be present to define the operation to do. // The module can validate this input. message Operation { optional Put put optional Delete delete optional Merge merge optional Checkpoint checkpoint } // Each OM request would result in one batch of ordered operations submitted to the module. // The module can internally combine these Batches into one larger Batch proto that gets submitted to Ratis. // The update to the transaction ID table need to be handled within the module for each batch applied. message Batch { repeated Operation operations } ``` Now to translate each proto to a DB update: - `Put` and `Delete` simply map to existing RocksDB put and delete key ops. Note that RocksDB does not have a move operation. - `Checkpoint` creates a RocksDB checkpoint and will be used by snapshots. - `Merge` will be used to implement any increments required, like quota using the RocksDB associative [merge operator](https://github.com/facebook/rocksdb/wiki/merge-operator). Initializers of the module will pass in a mapping of column families to their corresponding merge operators if required. - For example, the OM would initialize the module with a `BucketInfoMergeOperator` on the `BucketTable`, a `VolumeInfoMergeOperator` on the `VolumeTable`, etc. Then the API surface between OM or any other service and the replicated DB module is just a map of merge operator callbacks provided on construction, and calls to submit new `Batch`es to the module. ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. +- there will not be any double buffer and all changes will be flushed to db immediately. +- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node. + +## Description + +### Index generation + +In old flow, ratis index is used for `object Id` of key and `update Id` for key update. +For new flow, it will not depend on ratis index, but will have its own **`managed index`**. + +Index initialization / update: +- First time startup: 0 +- On restart (leader): last preserved index + 1 +- On Switch over: last index + 1 +- Request execution: index + 1 +- Upgrade: Last Ratis index + 1 + + +#### Index Persistence: + +Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX" +Format: <timestamp>#<index> +Time stamp: This will be used to identify last saved transaction executed +Index: index identifier of the request Review Comment: Please check the rendered version of this section I don't think it is being displayed as intended. ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. +- there will not be any double buffer and all changes will be flushed to db immediately. +- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node. + +## Description + +### Index generation + +In old flow, ratis index is used for `object Id` of key and `update Id` for key update. +For new flow, it will not depend on ratis index, but will have its own **`managed index`**. + +Index initialization / update: +- First time startup: 0 +- On restart (leader): last preserved index + 1 +- On Switch over: last index + 1 +- Request execution: index + 1 +- Upgrade: Last Ratis index + 1 + + +#### Index Persistence: Review Comment: Please add a lot more details to this section, it doesn't really explain how this will work. I assume there is going to be some sort of atomic long incremented in memory. The control request section also does not add much information to explain this. ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. Review Comment: Is my understanding here correct? ```suggestion - The Current performance envelope for OM is around 12k transactions per second. The early testing of this feature pushes this to 40k transactions per second. ``` ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. +- there will not be any double buffer and all changes will be flushed to db immediately. +- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node. + +## Description + +### Index generation + +In old flow, ratis index is used for `object Id` of key and `update Id` for key update. +For new flow, it will not depend on ratis index, but will have its own **`managed index`**. + +Index initialization / update: +- First time startup: 0 +- On restart (leader): last preserved index + 1 +- On Switch over: last index + 1 +- Request execution: index + 1 +- Upgrade: Last Ratis index + 1 + + +#### Index Persistence: + +Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX" +Format: <timestamp>#<index> +Time stamp: This will be used to identify last saved transaction executed +Index: index identifier of the request + +Sync the Index to other nodes: +Special request body having metadata: [Execution Control Message](leader-execution.md#control-request). + + +#### Step-by-step incremental changes for existing flow + +1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow. +2. objectId generation: need follow old logic of index to objectId mapping. + +### No-Cache for write operation + +In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. +Since DB changes is done in batches, so a cache is maintained till flush of DB is completed. Cache is maintained so that OM can serve further request till flush is completed. + +This adds complexity during read for the keys, as it needs ensure to have the latest data from cache or DB. +Since there can be parallel operation of adding keys to cache, removal from cache and flush to db, this induces bug to the code if this is not handled properly. + +For new flow, partial table cache is removed, and changes are visible as soon as changes are flushed to db. +For this to achieve, +- granular locking for key operation to avoid parallel update till the existing operation completes. This avoids need of cache as data is available only after changes are persisted. +- Double buffer operation removal for the flow, flush is done immediately before response is returned. This is no more needed as no need to serve next request as current reply is not done. +- Bucket resource is handled in such a way that its visible only after db changes are flushed. This is required as quota is shared between different keys operating parallel. +Note: For incremental changes, quota count will be available immediately for read for compatibility with older flow till all flows are migrated to new flow. + +### Quota handling (Bucket Quota) + +Earlier, bucket level lock is taken, quota validation is performed and updated with-in lock in cache in all nodes. +During startup before persistence to db, request is re-executed from ratis log and bucket quota cache is prepared again. +So once bucket quota is updated in cache, it will remain same (As recovered during startup with same re-execution). + +Now request is getting executed at leader node, so bucket case will not be able to recover if crash happens. So it can be updated in BucketTable cache only after its persisted. + + + +For bucket quota in new flow, +- When processing key commit, the quota will be `reserved` at leader. +- Bucket quota changes will be distributed to all other nodes via ratis +- At all nodes, key changes is flushed to DB, during that time, quota change will be updated to BucketTable, and quota reserve will be reset. +- On failure, reserve quota for the request will be reset. + +`Bucket Resource Quota` will store quota information with respect to `index` also and same will be used to reset on request handling, +- At leader node after request is send to ratis in success and failure path (as default always) with `request index` +- At all nodes on apply transaction, quota is reset with request index. + So in all cases, reserved quota can be removed in processing of request. + +Cases: +1. Quota is reserved at leader node but sync to other nodes fail, quota will be reset always +2. Quota is updated at leader node in apply transaction, it will reset quota to avoid double quota increase +3. Quota is updated at follower node in apply transaction, reset as no impact as `Bucket Quota resource` will not have any entry for the the request + +### Granular locking +Gateway: Perform lock as per below strategy for OBS/FSO +On lock success, trigger execution of request to respective executor queue + +#### OBS Locking +refer [OBS locking](obs-locking.md) + +#### FSO Locking +TODO + +Challenges compared to OBS, +1. Implicit directory creation +2. file Id depends on parent directory /<volId>/<bucketId>/<parent ObjectId>/<file name> +So due to hierarchy in nature and parallel operation at various level, FSO locking is more complicated. + +#### Legacy Locking: +Not-in-scope + +### Optimized new flow + +Currently, a request is handled as: +- Pre-execute: does request static validation, authorization +- validateAndUpdateCache: locking, handle request, update cache +- Double buffer to update DB using cache happening in background + +Request execution Template: every request handling need follow below template of request execution. + +- preProcess: basic request validation, update parameter like user info, normalization of key +- authorize: perform ranger or native acl validation +- lock: granular level locking +- unlock: unlock locked keys Review Comment: What happens while we are holding the lock down? Shouldn't this be where processing is happening? This seems like a duplicate of the information in the "Leader Execution" section but both sections are missing steps. For example submitting to Ratis is not mentioned here anywhere. ########## hadoop-hdds/docs/content/design/leader-execution/leader-execution.md: ########## @@ -0,0 +1,422 @@ +--- +title: Ozone Leader Side Execution +summary: Ozone request execution at leader side +date: 2025-01-06 +jira: HDDS-11898 +status: draft +author: Sumit Agrawal +--- +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Background + +Here is the summary of the challenges: + +- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. +- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. +- The number of transactions that can be pushed through Ratis currently caps out around 25k. +- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. + +## Execution at leader node needs deal with below cases +1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent. +2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking. +3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. +4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. +5. Request execution flow optimization: Optimize request execution flow, removing un-necessary operation and improve testability. +6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization. + +### Object ID generation +Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run. + +- If OM adopts multi Ratis to scale writes further, Object IDs will not longer be unique. +- If we shard OM, then across OMs the object ID will not be unique. +- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. + +Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. + +## Prototype Performance Result: + +| sno | item | old flow result | leader execution result | +|-----|------------------------------------------|-------------------------------|------------------------| +| 1 | Operation / Second (key create / commit) | 12k+ | 40k+ | +| 2 | Key Commit / Second | 5.9k+ | 20k+ (3.3 times) | +| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | +| 4 | CPU Utilization Follower | 6% above | 4% below | + +Refer [performance prototype result](performance-prototype-result.pdf) + +# Leader execution + + + +Client --> OM --> Gatekeeper ---> Executor --> Batching (ratis request) --{Ratis sync to all nodes}--> apply transaction {db update} + + +### Gatekeeper +Gatekeeper act as entry point for request execution. Its function is: +1. orchestrate the execution flow +2. granular locking +3. execution of request +4. validate om state like upgrade +5. update metrics and return response +6. handle client replay of request +7. managed index generation (remove dependency with ratis index for objectId) + +### Executor +This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache. + +### Batching (Ratis request) +All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. + +### Apply Transaction (via ratis at all nodes) +With new flow as change, +- all nodes during ratis apply transaction will just only update the DB for changes. +- there will not be any double buffer and all changes will be flushed to db immediately. +- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node. + +## Description + +### Index generation + +In old flow, ratis index is used for `object Id` of key and `update Id` for key update. +For new flow, it will not depend on ratis index, but will have its own **`managed index`**. + +Index initialization / update: +- First time startup: 0 +- On restart (leader): last preserved index + 1 +- On Switch over: last index + 1 +- Request execution: index + 1 +- Upgrade: Last Ratis index + 1 + + +#### Index Persistence: + +Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX" +Format: <timestamp>#<index> +Time stamp: This will be used to identify last saved transaction executed +Index: index identifier of the request + +Sync the Index to other nodes: +Special request body having metadata: [Execution Control Message](leader-execution.md#control-request). + + +#### Step-by-step incremental changes for existing flow + +1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow. +2. objectId generation: need follow old logic of index to objectId mapping. + +### No-Cache for write operation + +In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. Review Comment: ```suggestion In old flow, a key creation / update is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
