thinkharderdev opened a new pull request, #59:
URL: https://github.com/apache/arrow-ballista/pull/59

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Closes #39 
   
   Posting this draft PR for review and feedback but there are some more TODO 
items still in progress (mostly around cleanup and unit test coverage) but this 
change passes the integration tests as is so should be ready for a "test-drive" 
now. 
   
   TODO (before merging)
   
   - [ ] Additional unit test coverage
   - [ ] Fix `ExecuteQuery` so we don't do all the planning as part of the gRPC 
handler (I changed this to simplify things while testing but need to revert 
back to the old implementation where we do the planning in an async task in the 
event loop)
   - [ ] General cleanup and documentation. 
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   
   See #39 for a complete description but this change addresses the following 
issues:
   1. Allow for deployments with multiple schedulers for high-availability and 
scalability. 
   2. Maintain fine-grained task state in persistent storage so even 
single-scheduler deployments are more robust to scheduler restarts.  
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is 
sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   This is quite a large refactor. See the key points below: 
   
   ## Event Loop
   
   The core event loop remains mostly unchanged. The biggest change is that 
stage scheduling has been mostly internalized by the `ExecutionGraph` so we 
only publish `JobSubmitted` and `JobCompleted` query stage events/ Likewise, 
the actual structure of `SchedulerServerEvents` is changed (see details below). 
   
   ## ExecutionGraph
   
   In the current implementation, the state of a job is spread across a number 
of different data structures so would be difficult to move to external state in 
the current form, especially in light of requirements around distributed locks.
   
   In this change, the `TaskScheduler` and `StageManager` have been replaced 
with an `ExecutionGraph` data structure which is serializable (so it can be 
persisted in the state backend easily) and internalized both the underlying 
execution plan and dependencies between query stages. This data structure 
internalizes the order of execution and availability of tasks and presents an 
interface that allows the scheduler to see a job as a stream of discrete tasks, 
which I believe makes the logic in the scheduler loop more straightforward.
   
   ## ExecutorReservation
   
   This data structure represents a reserved task slot on a given executor 
which may optionally be tied to a specific job ID. The idea here is that to 
avoid lock contention on the backend data store, we "hang on" to task slots 
through the scheduler loop instead of immediately returning them to the pool. 
So when a scheduler gets a task status update, it has a new task slot 
reservation that it can try to fill and will only return that task slot to the 
pool if it cannot find a task that is ready. 
   
   ## TaskManager
   
   The `TaskManager` encapsulates task/job management within the scheduler. The 
two most important things the `TaskManager` does are:
   * `fill_reservations` which will take a list of `ExecutorReservation`s and 
try to assign a task to each one (with preference given to the reservations 
`job_id` if present). See the docs string for details about the implementation.
   * `update_task_statuses` which will apply task status updates received from 
the executors and return a list of `QueryStageSchedulerEvent` along with a list 
of `ExecutorReservation` to be filled by the scheduler. See the docs string for 
details about the implementation.
   
   ## StateBackendClient
   
   I've changed this trait slightly to help with this use case:
   * Added the concept of a `Keyspace` which is just a namespace for keys but 
we already use namespace for something else so I didn't want to overload the 
term. This is mostly just encoding a desired structure for the storage layer as 
`Keyspace` is an enum and helping to remove boilerplate elsewhere in the 
codebase. 
   * Made the `lock` method scoped to a `Keyspace` and `key` so we can lock 
individual resources. Using a single global mutex on the state is probably not 
scalable. 
   * Added a `scan` method which will return all key/values in a particular 
keyspace (with an optional limit). 
   * Added a `scan_keys` which will do the same as `scan` but only return keys 
   * Added a `put_txn` method which allows atomically updating multiple 
key/values. Since we do batch updates in many places, this simplifies error 
handling. It is implemented in both Sled (using batch operations) and etcd 
(using transactions). 
   * Added a `mv` (because move is a reserved word in rust :)) that will 
atomically move a key from one keyspace to another. 
   
   ## The Scheduler Flow
   
   Putting this altogether, the conceptual flow in the scheduler works like so:
   
   ### Push Scheduling
   
   * Receive a set of task updates
   * Call `TaskManager::update_task_statuses` to apply updates and get back a 
set of `ExecutorReservation`s
   * Publish a `SchedulerServerEvent::Offer` event with these reservations
   * The event loop will receive this event and call 
`TaskManager::fill_reservations` to attempt to assign tasks to each 
reservation, giving priority to jobs which were being updated.
   * For assigned reservations, launch the corresponding tasks. 
   * For unassigned reservations, cancel (i.e. return the task slots to the 
pool)
   
   When a new job is submitted, we will try and reserve task slots up to the 
number of tasks in the job's initial stage and launch them. 
   
   ### Pull Scheduling
   * Receive a `PollWorkRequest`
   * Apply and task updated in the request using 
`TaskManager::update_task_statuses`
   * If the poller can accept a task, create a new reservation and call 
`TaskManager::fill_reservations` to try and fill it. 
   * Return a `PollWorkResponse` with the task that was assigned (if any). 
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be 
updated before approving the PR.
   -->
   
   This change is mostly to the internal mechanics of the scheduler. The only 
user-facing change is to the `StateBackendClient` trait. 
   
   <!--
   If there are any breaking changes to public APIs, please add the `api 
change` label.
   -->
   
   Yes, the `StateBackendClient` contract is changed. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to