martin-g commented on code in PR #1852:
URL:
https://github.com/apache/datafusion-ballista/pull/1852#discussion_r3388101961
##########
ballista/scheduler/src/state/aqe/planner.rs:
##########
@@ -151,7 +152,7 @@ impl AdaptivePlanner {
pub async fn try_new(
ctx: &SessionContext,
logical_plan: &LogicalPlan,
- job_name: String,
+ job_name: JobName,
Review Comment:
Maybe this should be changed to `JobId` and update the callers ?!
Or pass both if needed.
##########
ballista/executor/src/execution_loop.rs:
##########
Review Comment:
```suggestion
let job_id = task.job_id.into();
```
##########
ballista/executor/src/execution_loop.rs:
##########
@@ -275,7 +275,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
})?;
let query_stage_exec = executor.execution_engine.create_query_stage_exec(
- job_id.clone(),
+ job_id.clone().into(),
Review Comment:
```suggestion
job_id.clone(),
```
##########
ballista/scheduler/src/scheduler_server/event.rs:
##########
@@ -150,6 +150,7 @@ impl Debug for QueryStageSchedulerEvent {
QueryStageSchedulerEvent::JobDataClean(job_id) => {
write!(f, "JobDataClean : job_id={job_id}.")
}
+ // TODO: This is not job_id but Executor ID (based on usage).
Review Comment:
Shall we introduce ExecutorId too ? I think it would be good!
##########
ballista/core/src/execution_plans/sort_shuffle/spill.rs:
##########
@@ -273,7 +274,7 @@ mod tests {
let mut manager = SpillManager::new(
temp_dir.path().to_str().unwrap(),
- "job1",
+ &JobId::new("job1"),
Review Comment:
IMO this would be friendlier with `"job1".into()`
##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -568,11 +569,12 @@ async fn execute_query_push(
status,
flight_proxy,
} = item;
- let job_id = status
+ let job_id: JobId = status
.as_ref()
.map(|s| s.job_id.to_owned())
- .unwrap_or("unknown_job_id".to_string()); // should not happen
- if !job_id.starts_with("unknown_") {
+ .unwrap_or("unknown_job_id".to_string()) // should not happen
Review Comment:
```suggestion
.unwrap_or_else(|| "unknown_job_id".to_string()) // should not
happen
```
##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -835,14 +838,14 @@ mod test {
LogicalPlan::default(),
"session".to_string(),
));
- *exec.job_id.lock() = Some("job-123".to_string());
+ *exec.job_id.lock() = Some("job-123".to_owned().into());
Review Comment:
All `"job_id".to_owned().into()` could be shortened to `"job_id".into()`
##########
ballista/executor/src/execution_loop.rs:
##########
@@ -285,7 +285,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
let part = PartitionId {
- job_id: job_id.clone(),
+ job_id: job_id.clone().into(),
Review Comment:
```suggestion
job_id: job_id.clone(),
```
##########
ballista/scheduler/src/state/aqe/planner.rs:
##########
@@ -359,12 +364,10 @@ impl AdaptivePlanner {
// that would arise if the rule walked the entire
residual
// plan in `default_optimizers()`.
let plan = CoalescePartitionsRule.optimize(plan,
config)?;
- BallistaAdapter::adapt_to_ballista(
- plan,
- self.job_name.as_str(),
- config,
- )
- .map(|w| (w.plan.stage_id(), w))
+ // adapt_to_ballista takes an job_id, we are passing a
job_name. Need to transform to fix compiler.
+ let job_id = self.job_name.clone().into_inner().into();
Review Comment:
I have the feeling `self.job_name` really should have been named
`self.job_id`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]