thinkharderdev opened a new pull request, #59: URL: https://github.com/apache/arrow-ballista/pull/59
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #39 Posting this draft PR for review and feedback but there are some more TODO items still in progress (mostly around cleanup and unit test coverage) but this change passes the integration tests as is so should be ready for a "test-drive" now. TODO (before merging) - [ ] Additional unit test coverage - [ ] Fix `ExecuteQuery` so we don't do all the planning as part of the gRPC handler (I changed this to simplify things while testing but need to revert back to the old implementation where we do the planning in an async task in the event loop) - [ ] General cleanup and documentation. # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See #39 for a complete description but this change addresses the following issues: 1. Allow for deployments with multiple schedulers for high-availability and scalability. 2. Maintain fine-grained task state in persistent storage so even single-scheduler deployments are more robust to scheduler restarts. # What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> This is quite a large refactor. See the key points below: ## Event Loop The core event loop remains mostly unchanged. The biggest change is that stage scheduling has been mostly internalized by the `ExecutionGraph` so we only publish `JobSubmitted` and `JobCompleted` query stage events/ Likewise, the actual structure of `SchedulerServerEvents` is changed (see details below). ## ExecutionGraph In the current implementation, the state of a job is spread across a number of different data structures so would be difficult to move to external state in the current form, especially in light of requirements around distributed locks. In this change, the `TaskScheduler` and `StageManager` have been replaced with an `ExecutionGraph` data structure which is serializable (so it can be persisted in the state backend easily) and internalized both the underlying execution plan and dependencies between query stages. This data structure internalizes the order of execution and availability of tasks and presents an interface that allows the scheduler to see a job as a stream of discrete tasks, which I believe makes the logic in the scheduler loop more straightforward. ## ExecutorReservation This data structure represents a reserved task slot on a given executor which may optionally be tied to a specific job ID. The idea here is that to avoid lock contention on the backend data store, we "hang on" to task slots through the scheduler loop instead of immediately returning them to the pool. So when a scheduler gets a task status update, it has a new task slot reservation that it can try to fill and will only return that task slot to the pool if it cannot find a task that is ready. ## TaskManager The `TaskManager` encapsulates task/job management within the scheduler. The two most important things the `TaskManager` does are: * `fill_reservations` which will take a list of `ExecutorReservation`s and try to assign a task to each one (with preference given to the reservations `job_id` if present). See the docs string for details about the implementation. * `update_task_statuses` which will apply task status updates received from the executors and return a list of `QueryStageSchedulerEvent` along with a list of `ExecutorReservation` to be filled by the scheduler. See the docs string for details about the implementation. ## StateBackendClient I've changed this trait slightly to help with this use case: * Added the concept of a `Keyspace` which is just a namespace for keys but we already use namespace for something else so I didn't want to overload the term. This is mostly just encoding a desired structure for the storage layer as `Keyspace` is an enum and helping to remove boilerplate elsewhere in the codebase. * Made the `lock` method scoped to a `Keyspace` and `key` so we can lock individual resources. Using a single global mutex on the state is probably not scalable. * Added a `scan` method which will return all key/values in a particular keyspace (with an optional limit). * Added a `scan_keys` which will do the same as `scan` but only return keys * Added a `put_txn` method which allows atomically updating multiple key/values. Since we do batch updates in many places, this simplifies error handling. It is implemented in both Sled (using batch operations) and etcd (using transactions). * Added a `mv` (because move is a reserved word in rust :)) that will atomically move a key from one keyspace to another. ## The Scheduler Flow Putting this altogether, the conceptual flow in the scheduler works like so: ### Push Scheduling * Receive a set of task updates * Call `TaskManager::update_task_statuses` to apply updates and get back a set of `ExecutorReservation`s * Publish a `SchedulerServerEvent::Offer` event with these reservations * The event loop will receive this event and call `TaskManager::fill_reservations` to attempt to assign tasks to each reservation, giving priority to jobs which were being updated. * For assigned reservations, launch the corresponding tasks. * For unassigned reservations, cancel (i.e. return the task slots to the pool) When a new job is submitted, we will try and reserve task slots up to the number of tasks in the job's initial stage and launch them. ### Pull Scheduling * Receive a `PollWorkRequest` * Apply and task updated in the request using `TaskManager::update_task_statuses` * If the poller can accept a task, create a new reservation and call `TaskManager::fill_reservations` to try and fill it. * Return a `PollWorkResponse` with the task that was assigned (if any). # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> This change is mostly to the internal mechanics of the scheduler. The only user-facing change is to the `StateBackendClient` trait. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Yes, the `StateBackendClient` contract is changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org