gianm opened a new pull request, #16168:
URL: https://github.com/apache/druid/pull/16168
This patch contains two controller changes that make progress towards a
lower-latency MSQ.
These are both larger changes, but I developed them together and it was most
straightforward to put them into a single PR rather than separating into
multiple PRs, especially given they both involved changes in common files like
ControllerImpl and ControllerContext.
**Key classes:**
These files are the most interesting IMO.
- Controller
- ControllerImpl
- ControllerQueryKernel
- ControllerQueryKernelConfig
- ControllerUtils (especially computeStageGroups)
- ControllerContext
- MSQWorkerTaskLauncher
**Key changes:**
First, support for **in-memory shuffles**. The main feature of in-memory
shuffles, as far as the controller is concerned, is that they are not fully
buffered. That means that whenever a producer stage uses in-memory output, its
consumer must run concurrently. The controller determines which stages run
concurrently, and when they start and stop.
"Leapfrogging" allows any chain of sort-based stages to use in-memory
shuffles even if we can only run two stages at once. For example, in a linear
chain of stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use
in-memory shuffling for each one while only running two at once. (When stage 1
is done reading input and about to start writing its output, we can stop 0 and
start 2.)
1) New `OutputChannelMode` enum attached to `WorkOrder` that tells workers
whether stage output should be in memory (`MEMORY`), or use local or
durable
storage.
2) New logic in the `ControllerQueryKernel` to determine which stages can use
in-memory shuffling (`ControllerUtils#computeStageGroups`) and to launch
them
at the appropriate time (`ControllerQueryKernel#createNewKernels`).
3) New `doneReadingInput` method on `Controller` (passed down to the stage
kernels)
which allows stages to transition to `POST_READING` even if they are not
gathering statistics. This is important because it enables "leapfrogging"
for `HASH_LOCAL_SORT` shuffles, and for `GLOBAL_SORT` shuffles with 1
partition.
4) Moved result-reading from `ControllerContext#writeReports` to new
`QueryListener`
interface, which `ControllerImpl` feeds results to row-by-row while the
query
is still running. Important so we can read query results from the final
stage using an in-memory channel.
5) New class `ControllerQueryKernelConfig` holds configs that control kernel
behavior (such as whether to pipeline, maximum number of concurrent
stages,
etc). Generated by the `ControllerContext`.
Second, a refactor **towards running workers in persistent JVMs** that are
able to cache data across queries. This is helpful because I believe we'll want
to reuse JVMs and cached data for latency reasons.
1) Move creation of `WorkerManager` and `TableInputSpecSlicer` to the
`ControllerContext`, rather than `ControllerImpl`. This allows managing
workers and
work assignment differently when JVMs are reusable.
2) Lift the Controller Jersey resource out from `ControllerChatHandler` to a
reusable resource `ControllerResource`.
3) Move memory introspection to a `MemoryIntrospector` interface, and
introduce
`ControllerMemoryParameters` that uses it. This makes it easier to run
MSQ in
process types other than Indexer and Peon.
Both of these areas will have follow-ups that make similar changes on the
worker side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]