thinkharderdev opened a new pull request, #59:
URL: https://github.com/apache/arrow-ballista/pull/59
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases. You can
link an issue to this PR using the GitHub syntax. For example `Closes #123`
indicates that this PR will close issue #123.
-->
Closes #39
Posting this draft PR for review and feedback but there are some more TODO
items still in progress (mostly around cleanup and unit test coverage) but this
change passes the integration tests as is so should be ready for a "test-drive"
now.
TODO (before merging)
- [ ] Additional unit test coverage
- [ ] Fix `ExecuteQuery` so we don't do all the planning as part of the gRPC
handler (I changed this to simplify things while testing but need to revert
back to the old implementation where we do the planning in an async task in the
event loop)
- [ ] General cleanup and documentation.
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly in
the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand your
changes and offer better suggestions for fixes.
-->
See #39 for a complete description but this change addresses the following
issues:
1. Allow for deployments with multiple schedulers for high-availability and
scalability.
2. Maintain fine-grained task state in persistent storage so even
single-scheduler deployments are more robust to scheduler restarts.
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it is
sometimes worth providing a summary of the individual changes in this PR.
-->
This is quite a large refactor. See the key points below:
## Event Loop
The core event loop remains mostly unchanged. The biggest change is that
stage scheduling has been mostly internalized by the `ExecutionGraph` so we
only publish `JobSubmitted` and `JobCompleted` query stage events/ Likewise,
the actual structure of `SchedulerServerEvents` is changed (see details below).
## ExecutionGraph
In the current implementation, the state of a job is spread across a number
of different data structures so would be difficult to move to external state in
the current form, especially in light of requirements around distributed locks.
In this change, the `TaskScheduler` and `StageManager` have been replaced
with an `ExecutionGraph` data structure which is serializable (so it can be
persisted in the state backend easily) and internalized both the underlying
execution plan and dependencies between query stages. This data structure
internalizes the order of execution and availability of tasks and presents an
interface that allows the scheduler to see a job as a stream of discrete tasks,
which I believe makes the logic in the scheduler loop more straightforward.
## ExecutorReservation
This data structure represents a reserved task slot on a given executor
which may optionally be tied to a specific job ID. The idea here is that to
avoid lock contention on the backend data store, we "hang on" to task slots
through the scheduler loop instead of immediately returning them to the pool.
So when a scheduler gets a task status update, it has a new task slot
reservation that it can try to fill and will only return that task slot to the
pool if it cannot find a task that is ready.
## TaskManager
The `TaskManager` encapsulates task/job management within the scheduler. The
two most important things the `TaskManager` does are:
* `fill_reservations` which will take a list of `ExecutorReservation`s and
try to assign a task to each one (with preference given to the reservations
`job_id` if present). See the docs string for details about the implementation.
* `update_task_statuses` which will apply task status updates received from
the executors and return a list of `QueryStageSchedulerEvent` along with a list
of `ExecutorReservation` to be filled by the scheduler. See the docs string for
details about the implementation.
## StateBackendClient
I've changed this trait slightly to help with this use case:
* Added the concept of a `Keyspace` which is just a namespace for keys but
we already use namespace for something else so I didn't want to overload the
term. This is mostly just encoding a desired structure for the storage layer as
`Keyspace` is an enum and helping to remove boilerplate elsewhere in the
codebase.
* Made the `lock` method scoped to a `Keyspace` and `key` so we can lock
individual resources. Using a single global mutex on the state is probably not
scalable.
* Added a `scan` method which will return all key/values in a particular
keyspace (with an optional limit).
* Added a `scan_keys` which will do the same as `scan` but only return keys
* Added a `put_txn` method which allows atomically updating multiple
key/values. Since we do batch updates in many places, this simplifies error
handling. It is implemented in both Sled (using batch operations) and etcd
(using transactions).
* Added a `mv` (because move is a reserved word in rust :)) that will
atomically move a key from one keyspace to another.
## The Scheduler Flow
Putting this altogether, the conceptual flow in the scheduler works like so:
### Push Scheduling
* Receive a set of task updates
* Call `TaskManager::update_task_statuses` to apply updates and get back a
set of `ExecutorReservation`s
* Publish a `SchedulerServerEvent::Offer` event with these reservations
* The event loop will receive this event and call
`TaskManager::fill_reservations` to attempt to assign tasks to each
reservation, giving priority to jobs which were being updated.
* For assigned reservations, launch the corresponding tasks.
* For unassigned reservations, cancel (i.e. return the task slots to the
pool)
When a new job is submitted, we will try and reserve task slots up to the
number of tasks in the job's initial stage and launch them.
### Pull Scheduling
* Receive a `PollWorkRequest`
* Apply and task updated in the request using
`TaskManager::update_task_statuses`
* If the poller can accept a task, create a new reservation and call
`TaskManager::fill_reservations` to try and fill it.
* Return a `PollWorkResponse` with the task that was assigned (if any).
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
This change is mostly to the internal mechanics of the scheduler. The only
user-facing change is to the `StateBackendClient` trait.
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
Yes, the `StateBackendClient` contract is changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]