thinkharderdev opened a new issue, #554:
URL: https://github.com/apache/arrow-ballista/issues/554

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   A clear and concise description of what the problem is. Ex. I'm always 
frustrated when [...] 
   (This section helps Arrow developers understand the context and *why* for 
this feature, in addition to  the *what*)
   
   Currently, persistent state is managed using the `StateBackendClient` trait 
which is very low-level interface over a key-value store. This is quite 
flexible but has a couple of drawbacks:
   
   1. It requires serde overhead even when it is not required. For instance in 
the recently added `MemoryBackendState` everything is kept in memory so there 
is no real reason why anything needs to be serialized to `Vec<u8>`
   2. Much more importantly, it fixes access patterns in a way that prevents 
specific implementation from using backend-specific features such as batch 
operations, atomic counters, key expiration etc. This has the effect of forcing 
the use of distributed locking in cases where it can potentially be avoided.
   
   For example, it's possible with a Redis as a backend to do global task slot 
allocation atomically and without any distributed locking using HMap and a 
little bit of server-side scripting. But there is really no way to express that 
using the current interface. 
   
   **Describe the solution you'd like**
   A clear and concise description of what you want to happen.
   
   Break the current `StateBackendClient` trait into two separate higher-level 
interfaces:
   
   ```rust
   trait JobState {
     async fn save_job(&self, job_id: &str, graph: ExecutionGraph) -> 
Result<()>;
     async fn fail_job(&self, job_id: &str, reason: String, graph: 
Option<ExecutionGraph>) -> Result<()>; 
     async fn remove_job(&self, job_id: &str) -> Result<()>;
     async fn save_session(&self, ctx: &Arc<SessionContext>) -> Result<()>;
     async fn get_session(&self, session_id: &str) -> 
Result<Arc<SessionContext>>;
   }
   
   trait ClusterState {
     async fn reserve_slots(&self, num_slots: u32) -> 
Result<Vec<ExecutorReservation>>;
     /// Either reserve all `num_slots` slots or none
     async fn reserve_slots_exact(&self, num_slots: u32) -> 
Result<Vec<ExecutorReservation>>;
     async fn cancel_reservations(&self, reservations: 
Vec<ExecutorReservations>) -> Result<()>;
     async fn register_executor(&self, metadata: ExecutorMetadata, spec: 
ExecutorSpecification, reserve: bool) -> Result<Vec<ExecutorReservation>>;
     async fn remove_executor(&self, executor_id: &str) -> Result<()>;
   }
   ```
   
   This is only a sketch and these interfaces may need to be tweaked in various 
ways, but I think the benefits of the general approach are significant:
   
   1. It pushes locking down as an implementation detail in the state backend 
so we can leverage advanced features of particular data stores
   2. It makes application code more concise as we don't have to deal with the 
fussy details of the low-level interface there
   3. Implementations can "pick and choose" what needs to be visible globally 
(eg to all schedulers in a cluster) so we have more flexibility in deciding 
where we want to make tradeoffs between resiliency and performance. 
   4. For things where we only need to store state in memory we can get rid of 
the serialization overhead. 
   
   Note that the above traits don't have a way to `Watch` keys. I think the 
only place we use that is for listening to executor heartbeats and I think that 
can be internalized into the `ClusterState` implementation but if not it should 
be easy to work into this design. 
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features 
you've considered.
   
   We could not do this
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   
   As briefly mentioned above this is mostly coming from the desire to make 
Ballista truly highly available. The basic outline of the HA solution is pretty 
straightforward:
   
   1. Run multiple schedulers which manage all job state in memory.
   2. Handle sharding of jobs across schedulers on the client side using 
consistent hashing. 
   3. Use Redis as a backend for task slots/executor data so we can do atomic, 
lock-free slot allocation.
   4. Profit!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to