gianm opened a new issue, #17139:
URL: https://github.com/apache/druid/issues/17139

   # Motivation
   
   Druid 24 included a task-based multi-stage query engine proposed in #12262. 
This has proved useful for [DML (REPLACE, 
INSERT)](https://druid.apache.org/docs/latest/multi-stage-query/) and [querying 
directly from deep 
storage](https://druid.apache.org/docs/latest/querying/query-deep-storage).
   
   This proposal is to introduce the natural next evolution: an interactive 
"profile" of the engine. The same engine is configured to run interactively, 
including changes such as:
   
   - Read locally-cached data instead of pulling from deep storage.
   - Multithreaded workers inside shared JVMs, leveraging the work from #17057, 
#17048.
   - In-memory shuffles, leveraging the work from #16168, #16790, #16775.
   - No whole-worker fault-tolerance, which saves the need to checkpoint state 
to durable storage. (RPC fault tolerance through retries would still happen.)
   
   The main purpose of this engine is to provide a way to run queries that are 
too lightweight for the task-based MSQ engine to make sense, but too 
heavyweight for the standard native query engine to make sense. A good example 
would be a `GROUP BY` with an intermediate resultset of hundreds of millions of 
rows. In general this engine would specialize in the sort of midweight, ad-hoc 
queries that are common in the data warehousing world. I believe with some 
additional work it would also be possible to run lightweight, high QPS queries 
competitively with the standard native query engine.
   
   # Proposed changes
   
   ### Name
   
   In the initial PR I'll used the name **dart** for this profile of the 
engine. Darts are lightweight and go fast, which are good qualities in an 
interactive query engine. It even has a possible backronym: "Distributed 
Asynchronous Runtime Topology".
   
   ### API
   
   Initially I'm proposing an API that is compatible with the SQL query API, to 
make it easy to try out the new engine.
   
   To issue a query, `POST /druid/v2/sql/dart/` the same form of JSON payload 
that would be accepted by `/druid/v2/sql/`. Results are also in the same 
format. This is a synchronous API, although internally the engine is 
asynchronous, so it is definitely possible to introduce an asychronous API 
later on.
   
   To issue a query and also return a 
[report](https://druid.apache.org/docs/latest/api-reference/sql-ingestion-api#report-response-fields)
 with stages, counters, etc, `POST /druid/v2/sql/dart/?fullReport`. This is 
like an
   `EXPLAIN ANALYZE`. The report is in the same format as the reports generated 
by the task-based engine.
   
   To see a list of running queries (a feature that the native engine does not 
have), `GET /druid/v2/sql/dart/`.
   
   To cancel a query, `DELETE /druid/v2/sql/dart/{sqlQueryId}`.
   
   To check if the engine is enabled, `GET /druid/v2/sql/dart/enabled` (returns 
200 or 404).
   
   ### Servers and resource management
   
   Controllers run on Brokers (one per query) and the workers run on 
Historicals. Resource management would be bare-bones in the initial version, 
limited to simple controls on the number of concurrent queries that can execute 
on each server.
   
   On Brokers, there are three configs:
   
   - `druid.msq.dart.enabled = true` to enable Dart.
   - `druid.msq.dart.controller.concurrentQueries` provides a limit to the 
number of query controllers that can run concurrently on that Broker. 
Additional controllers beyond this number queue up. Default is 1.
   - `druid.msq.dart.query.context.targetPartitionsPerWorker` sets the number 
of partitions per worker to create during a shuffle. Generally this should be 
set to the number of threads available on workers, so they can process shuffled 
data fully multithreaded.
   
   Brokers only run controllers, so they do not need meaningful CPU or memory 
resources beyond what is needed to gather partition statistics for global 
sorts. (And anyway, I'd like to use fewer global sorts in the future; see 
"Future work" around `hashLocalSort`.)
   
   On Historicals, there are three configs:
   
   - `druid.msq.dart.enabled = true` to enable Dart.
   - `druid.msq.dart.worker.concurrentQueries` provides a limit to the number 
of query workers that can run concurrently on that Historical. Default is equal 
to the number of merge buffers, because each query needs one merge buffer. 
Ideally this should be set to something equal to, or larger than, the sum of 
the `concurrentQueries` setting on all Brokers.
   - `druid.msq.dart.worker.heapFraction` provides a limit to the amount of 
heap used across all Dart queries. The default is 0.35, or 35% of heap.
   
   The resource management model is very simple in the initial version. I 
expect it will evolve over time:
   
   - Concurrency: limit on concurrent queries at the Broker and at each 
Historical, given by server configuration.
   - Broker HTTP threads: each query currently ties up an HTTP thread. But it 
doesn't necessarily need to; this is only happening because of hooking into the 
existing SqlResource code. The engine is internally async.
   - Historical HTTP threads: not tied up; the Broker-to-Historical protocol is 
async.
   - Memory: each concurrently-running query gets one merge buffer and one 
slice of heap. Each query gets the same size of heap slice. Standard processing 
buffers are not used.
   - CPU: each query can use all available CPUs in a fine-grained way. Because 
memory is allocated to the query, not to a processing thread, it is possible to 
time-slice the processing pool more finely than with the standard query engine.
   - Disk: usage is currently uncontrolled; it is possible to fill up local 
disk with a heavyweight enough query.
   - No timeouts, priorities, or lanes. (yet.)
   - The "Future work" section includes thoughts on how resource management can 
evolve over time.
   
   The initial version does not run on realtime tasks, meaning realtime data is 
not included in queries.
   
   # Operational impact
   
   None if the engine is disabled or if queries are not being issued. If 
queries are being issued, on Historicals, Dart queries use the same merge 
buffers and processing pool as regular native queries, so would potentially 
conflict with other queries that need those resources. They also use up to 35% 
of heap space if actually running.
   
   On Brokers, Dart queries use the same HTTP threads as regular native 
queries, and could conflict there as well.
   
   The API and all configuration parameters should be considered experimental 
and subject to breaking changes in upcoming Druid releases, as the initial 
version of the feature evolves. The ability for Dart queries to function 
properly in a mixed-version environment (such as during a rolling update) is 
also not be guaranteed for these initial experimental releases. Nevertheless, 
this would have no impact on regular queries.
   
   # Future work
   
   Some thoughts on future work items.
   
   System:
   
   - The task-based profile _always_ pulls data from deep storage, and the 
initial version of the interactive profile _always_ uses locally pre-cached 
data. Some hybrid is a clear next move.
   - Include realtime data, using workers running on realtime tasks.
   - Graceful shutdown of Historicals (currently, the async nature of the API 
means that when a Historical is TERMed, it exits immediately, even if a query 
is in flight).
   
   Resource management:
   
   - Set the "concurrentQueries" and "targetPartitionsPerWorker" parameters 
automatically based on available resources. We should allow user-supplied 
configuration but it should not be required to get a good baseline level of 
performance.
   - Implement timeouts, priorities, and lanes.
   - Allow queries to burst up to an "attic" of additional memory. (Currently 
all queries get the same amount of memory, and need to use disk when they run 
out.)
   - Automatic reprioritization or relaning of queries based on runtime 
characteristics. Experience tells us that it is difficult for users to set good 
priorities on all queries before they are issued. There is a need for the 
system to determine the appropriate priority on its own. I think this will need 
to involve some degree of canceling or suspending, and then restarting, 
expensive queries in some cases.
   - Release workers when they are no longer needed. (Currently workers are 
held even if not all are needed for future stages.)
   - Controls on disk usage.
   
   Performance items:
   
   - Multithread `hashLocalSort` shuffles. Currently only one partition is 
sorted at a time, even on a multithreaded worker. This is the main reason the 
initial version is using `globalSort` so much, even though `globalSort` 
involves more overhead on the controller.
   - Use `hashLocalSort` for aggregation rather than `globalSort`, once it's 
multithreaded, to reduce dependency on the controller and on statistics 
gathering.
   - Aggregate (combine) opportunistically during sorting.
   - For aggregation, use per-thread hash tables that persist across segments. 
Currently each segment is processed with its own hash table, then the contents 
of the hash tables are sorted. It would be better to continue the hash 
aggregation as far as possible.
   - Improve maximum possible QPS by reducing overheads on simple queries like 
`SELECT COUNT(*) FROM tbl`.
   - Add QueryMetrics to reports for better insight into performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to