thinkharderdev commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1628653290
> ```
> local:
> INFO tokio-runtime-worker ThreadId(02)
ballista_scheduler::state::executor_manager: Reserved 0 executor slots in
588.435µs
> etcd:
> INFO tokio-runtime-worker ThreadId(02)
ballista_scheduler::state::executor_manager: Reserved 0 executor slots in
77.631762ms
> ```
>
> These days, I am testing the performance of etcd as a storage, but I found
that the performance is very poor, especially when applying for and releasing
resources. The time spent here is much longer than the local storage mode. Here
I suspect this is the global distributed lock. Cause, do you have any
suggestions here? @thinkharderdev @avantgardnerio
Yes, we found the same with etcd. I would suggest that if you don't need HA
then use a single scheduler with in-memory state. If you need HA, what we did
is implement `ClusterState` backed by redis. Haven't found the time to upstream
the implementation yet but it's relatively simple. I will try to find some time
to do that soon but the gist is that you use an `hmap` for holding the free
task slots (so a map `executor_id -> task_slots`) and then a lua script for
atomic reservation/freeing. Something roughly like:
```rust
const RESERVATION_SCRIPT: &str = r#"
local desired_slots = tonumber(ARGV[1])
local s = {}
for i=2, #ARGV do
local exists = redis.call('HEXISTS', KEYS[1], ARGV[i])
if( exists == 1 ) then
local value = redis.call('HGET', KEYS[1], ARGV[i])
local slots = tonumber(value)
if( slots >= desired_slots ) then
s[i - 1] = desired_slots
local inc = -desired_slots
redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
desired_slots = 0
elseif slots == 0 then
s[i - 1] = 0
else
s[i - 1] = slots
local inc = -slots
redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
desired_slots = desired_slots - slots
end
else
s[i - 1] = 0
end
if( desired_slots <= 0 ) then
break
end
end
return cjson.encode(s)
"#;
const CANCEL_SCRIPT: &str = r#"
local cancelled = 0
for i=2, #KEYS do
local exists = redis.call('HEXISTS', KEYS[1], KEYS[i])
if( exists == 1 ) then
local inc = tonumber(ARGV[i - 1])
redis.call('HINCRBY', KEYS[1], KEYS[i], inc)
cancelled = cancelled + inc
end
end
return cancelled
"#;
const SLOTS_KEY: &str = "task-slots";
impl ClusterState for MyRedisState {
async fn reserve_slots(
&self,
num_slots: u32,
_distribution: TaskDistribution,
executors: Option<HashSet<String>>,
) -> Result<Vec<ExecutorReservation>> {
if num_slots == 0 {
return Ok(vec![]);
}
if let Some(executors) = executors {
let mut connection = self.get_connection().await?;
let script = Script::new(RESERVATION_SCRIPT);
let mut script = script.key(SLOTS_KEY);
script.arg(num_slots);
if !executors.is_empty() {
let executor_ids: Vec<String> =
executors.into_iter().collect();
for executor_id in &executor_ids {
script.arg(executor_id);
}
let result: String = match script.invoke_async(&mut
connection).await {
Ok(result) => result,
Err(e) => {
timer.stop_and_discard();
return Err(into_ballista_error(e));
}
};
let reservations =
serde_json::from_str::<Vec<u32>>(&result).map_err(|e| {
BallistaError::General(format!(
"Error executing reservations, unexpected response
from redis: {e:?}"
))
})?;
let reservations: Vec<ExecutorReservation> = executor_ids
.into_iter()
.zip(reservations)
.flat_map(|(id, reserved)| {
(0..reserved).map(move |_|
ExecutorReservation::new_free(id.clone()))
})
.collect();
return Ok(reservations);
}
}
Ok(vec![])
}
async fn cancel_reservations(&self, reservations:
Vec<ExecutorReservation>) -> Result<()> {
let mut connection = self.get_connection().await?;
if !reservations.is_empty() {
let script = Script::new(CANCEL_SCRIPT);
let mut script = script.key(SLOTS_KEY);
let reservations = reservations
.into_iter()
.group_by(|r| r.executor_id.clone())
.into_iter()
.map(|(key, group)| (key, group.count()))
.collect::<HashMap<String, usize>>();
for (executor_id, slots) in reservations {
script.key(executor_id);
script.arg(slots);
}
let cancelled: u64 = match script.invoke_async(&mut
connection).await {
Ok(cancelled) => {
cancelled
}
Err(e) => {
return Err(into_ballista_error(e));
}
};
debug!("Cancelled {} reservations", cancelled);
Ok(())
} else {
Ok(())
}
}
}
```
Not that this only supports `TaskDistribution::Bias` and for round-robin
task distribution you would need a different lua script (which we have not
implemented) but it could be done in principle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]