thinkharderdev commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1628653290

   > ```
   > local:
   > INFO tokio-runtime-worker ThreadId(02) 
ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 
588.435µs 
   > etcd:
   > INFO tokio-runtime-worker ThreadId(02) 
ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 
77.631762ms
   > ```
   > 
   > These days, I am testing the performance of etcd as a storage, but I found 
that the performance is very poor, especially when applying for and releasing 
resources. The time spent here is much longer than the local storage mode. Here 
I suspect this is the global distributed lock. Cause, do you have any 
suggestions here? @thinkharderdev @avantgardnerio
   
   Yes, we found the same with etcd. I would suggest that if you don't need HA 
then use a single scheduler with in-memory state. If you need HA, what we did 
is implement `ClusterState` backed by redis. Haven't found the time to upstream 
the implementation yet but it's relatively simple. I will try to find some time 
to do that soon but the gist is that you use an `hmap` for holding the free 
task slots (so a map `executor_id -> task_slots`) and then a lua script for 
atomic reservation/freeing. Something roughly like:
   
   ```rust
   const RESERVATION_SCRIPT: &str = r#"
   local desired_slots = tonumber(ARGV[1])
   local s = {}
   for i=2, #ARGV do
       local exists = redis.call('HEXISTS', KEYS[1], ARGV[i])
       if( exists == 1 ) then
           local value = redis.call('HGET', KEYS[1], ARGV[i])
           local slots = tonumber(value)
           if( slots >= desired_slots ) then
               s[i - 1] = desired_slots
               local inc = -desired_slots
               redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
               desired_slots = 0
           elseif slots == 0 then
               s[i - 1] = 0
           else
               s[i - 1] = slots
               local inc = -slots
               redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
               desired_slots = desired_slots - slots
           end
       else
           s[i - 1] = 0
       end
   
       if( desired_slots <= 0 ) then
           break
       end
   end
   return cjson.encode(s)
   "#;
   
   const CANCEL_SCRIPT: &str = r#"
   local cancelled = 0
   for i=2, #KEYS do
       local exists = redis.call('HEXISTS', KEYS[1], KEYS[i])
       if( exists == 1 ) then
           local inc = tonumber(ARGV[i - 1])
           redis.call('HINCRBY', KEYS[1], KEYS[i], inc)
           cancelled = cancelled + inc
       end
   end
   return cancelled
   "#;
   
   const SLOTS_KEY: &str = "task-slots";
   
   impl ClusterState for MyRedisState {
       async fn reserve_slots(
           &self,
           num_slots: u32,
           _distribution: TaskDistribution,
           executors: Option<HashSet<String>>,
       ) -> Result<Vec<ExecutorReservation>> {
           if num_slots == 0 {
               return Ok(vec![]);
           }
   
           if let Some(executors) = executors {
               let mut connection = self.get_connection().await?;
   
               let script = Script::new(RESERVATION_SCRIPT);
   
               let mut script = script.key(SLOTS_KEY);
               script.arg(num_slots);
   
               if !executors.is_empty() {
                   let executor_ids: Vec<String> = 
executors.into_iter().collect();
                   for executor_id in &executor_ids {
                       script.arg(executor_id);
                   }
   
                   let result: String = match script.invoke_async(&mut 
connection).await {
                       Ok(result) => result,
                       Err(e) => {
                           timer.stop_and_discard();
                           return Err(into_ballista_error(e));
                       }
                   };
   
                   let reservations = 
serde_json::from_str::<Vec<u32>>(&result).map_err(|e| {
                       BallistaError::General(format!(
                           "Error executing reservations, unexpected response 
from redis: {e:?}"
                       ))
                   })?;
   
                   let reservations: Vec<ExecutorReservation> = executor_ids
                       .into_iter()
                       .zip(reservations)
                       .flat_map(|(id, reserved)| {
                           (0..reserved).map(move |_| 
ExecutorReservation::new_free(id.clone()))
                       })
                       .collect();
   
                   return Ok(reservations);
               }
           }
   
           Ok(vec![])
       }
   
       async fn cancel_reservations(&self, reservations: 
Vec<ExecutorReservation>) -> Result<()> {
           let mut connection = self.get_connection().await?;
   
           if !reservations.is_empty() {
               let script = Script::new(CANCEL_SCRIPT);
               let mut script = script.key(SLOTS_KEY);
   
               let reservations = reservations
                   .into_iter()
                   .group_by(|r| r.executor_id.clone())
                   .into_iter()
                   .map(|(key, group)| (key, group.count()))
                   .collect::<HashMap<String, usize>>();
   
               for (executor_id, slots) in reservations {
                   script.key(executor_id);
                   script.arg(slots);
               }
   
               let cancelled: u64 = match script.invoke_async(&mut 
connection).await {
                   Ok(cancelled) => {
                       cancelled
                   }
                   Err(e) => {
                       return Err(into_ballista_error(e));
                   }
               };
   
               debug!("Cancelled {} reservations", cancelled);
   
               Ok(())
           } else {
               Ok(())
           }
       }
   }
   
   ```
   
   Not that this only supports `TaskDistribution::Bias` and for round-robin 
task distribution you would need a different lua script (which we have not 
implemented) but it could be done in principle. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to