gianm opened a new issue, #17139: URL: https://github.com/apache/druid/issues/17139
# Motivation Druid 24 included a task-based multi-stage query engine proposed in #12262. This has proved useful for [DML (REPLACE, INSERT)](https://druid.apache.org/docs/latest/multi-stage-query/) and [querying directly from deep storage](https://druid.apache.org/docs/latest/querying/query-deep-storage). This proposal is to introduce the natural next evolution: an interactive "profile" of the engine. The same engine is configured to run interactively, including changes such as: - Read locally-cached data instead of pulling from deep storage. - Multithreaded workers inside shared JVMs, leveraging the work from #17057, #17048. - In-memory shuffles, leveraging the work from #16168, #16790, #16775. - No whole-worker fault-tolerance, which saves the need to checkpoint state to durable storage. (RPC fault tolerance through retries would still happen.) The main purpose of this engine is to provide a way to run queries that are too lightweight for the task-based MSQ engine to make sense, but too heavyweight for the standard native query engine to make sense. A good example would be a `GROUP BY` with an intermediate resultset of hundreds of millions of rows. In general this engine would specialize in the sort of midweight, ad-hoc queries that are common in the data warehousing world. I believe with some additional work it would also be possible to run lightweight, high QPS queries competitively with the standard native query engine. # Proposed changes ### Name In the initial PR I'll used the name **dart** for this profile of the engine. Darts are lightweight and go fast, which are good qualities in an interactive query engine. It even has a possible backronym: "Distributed Asynchronous Runtime Topology". ### API Initially I'm proposing an API that is compatible with the SQL query API, to make it easy to try out the new engine. To issue a query, `POST /druid/v2/sql/dart/` the same form of JSON payload that would be accepted by `/druid/v2/sql/`. Results are also in the same format. This is a synchronous API, although internally the engine is asynchronous, so it is definitely possible to introduce an asychronous API later on. To issue a query and also return a [report](https://druid.apache.org/docs/latest/api-reference/sql-ingestion-api#report-response-fields) with stages, counters, etc, `POST /druid/v2/sql/dart/?fullReport`. This is like an `EXPLAIN ANALYZE`. The report is in the same format as the reports generated by the task-based engine. To see a list of running queries (a feature that the native engine does not have), `GET /druid/v2/sql/dart/`. To cancel a query, `DELETE /druid/v2/sql/dart/{sqlQueryId}`. To check if the engine is enabled, `GET /druid/v2/sql/dart/enabled` (returns 200 or 404). ### Servers and resource management Controllers run on Brokers (one per query) and the workers run on Historicals. Resource management would be bare-bones in the initial version, limited to simple controls on the number of concurrent queries that can execute on each server. On Brokers, there are three configs: - `druid.msq.dart.enabled = true` to enable Dart. - `druid.msq.dart.controller.concurrentQueries` provides a limit to the number of query controllers that can run concurrently on that Broker. Additional controllers beyond this number queue up. Default is 1. - `druid.msq.dart.query.context.targetPartitionsPerWorker` sets the number of partitions per worker to create during a shuffle. Generally this should be set to the number of threads available on workers, so they can process shuffled data fully multithreaded. Brokers only run controllers, so they do not need meaningful CPU or memory resources beyond what is needed to gather partition statistics for global sorts. (And anyway, I'd like to use fewer global sorts in the future; see "Future work" around `hashLocalSort`.) On Historicals, there are three configs: - `druid.msq.dart.enabled = true` to enable Dart. - `druid.msq.dart.worker.concurrentQueries` provides a limit to the number of query workers that can run concurrently on that Historical. Default is equal to the number of merge buffers, because each query needs one merge buffer. Ideally this should be set to something equal to, or larger than, the sum of the `concurrentQueries` setting on all Brokers. - `druid.msq.dart.worker.heapFraction` provides a limit to the amount of heap used across all Dart queries. The default is 0.35, or 35% of heap. The resource management model is very simple in the initial version. I expect it will evolve over time: - Concurrency: limit on concurrent queries at the Broker and at each Historical, given by server configuration. - Broker HTTP threads: each query currently ties up an HTTP thread. But it doesn't necessarily need to; this is only happening because of hooking into the existing SqlResource code. The engine is internally async. - Historical HTTP threads: not tied up; the Broker-to-Historical protocol is async. - Memory: each concurrently-running query gets one merge buffer and one slice of heap. Each query gets the same size of heap slice. Standard processing buffers are not used. - CPU: each query can use all available CPUs in a fine-grained way. Because memory is allocated to the query, not to a processing thread, it is possible to time-slice the processing pool more finely than with the standard query engine. - Disk: usage is currently uncontrolled; it is possible to fill up local disk with a heavyweight enough query. - No timeouts, priorities, or lanes. (yet.) - The "Future work" section includes thoughts on how resource management can evolve over time. The initial version does not run on realtime tasks, meaning realtime data is not included in queries. # Operational impact None if the engine is disabled or if queries are not being issued. If queries are being issued, on Historicals, Dart queries use the same merge buffers and processing pool as regular native queries, so would potentially conflict with other queries that need those resources. They also use up to 35% of heap space if actually running. On Brokers, Dart queries use the same HTTP threads as regular native queries, and could conflict there as well. The API and all configuration parameters should be considered experimental and subject to breaking changes in upcoming Druid releases, as the initial version of the feature evolves. The ability for Dart queries to function properly in a mixed-version environment (such as during a rolling update) is also not be guaranteed for these initial experimental releases. Nevertheless, this would have no impact on regular queries. # Future work Some thoughts on future work items. System: - The task-based profile _always_ pulls data from deep storage, and the initial version of the interactive profile _always_ uses locally pre-cached data. Some hybrid is a clear next move. - Include realtime data, using workers running on realtime tasks. - Graceful shutdown of Historicals (currently, the async nature of the API means that when a Historical is TERMed, it exits immediately, even if a query is in flight). Resource management: - Set the "concurrentQueries" and "targetPartitionsPerWorker" parameters automatically based on available resources. We should allow user-supplied configuration but it should not be required to get a good baseline level of performance. - Implement timeouts, priorities, and lanes. - Allow queries to burst up to an "attic" of additional memory. (Currently all queries get the same amount of memory, and need to use disk when they run out.) - Automatic reprioritization or relaning of queries based on runtime characteristics. Experience tells us that it is difficult for users to set good priorities on all queries before they are issued. There is a need for the system to determine the appropriate priority on its own. I think this will need to involve some degree of canceling or suspending, and then restarting, expensive queries in some cases. - Release workers when they are no longer needed. (Currently workers are held even if not all are needed for future stages.) - Controls on disk usage. Performance items: - Multithread `hashLocalSort` shuffles. Currently only one partition is sorted at a time, even on a multithreaded worker. This is the main reason the initial version is using `globalSort` so much, even though `globalSort` involves more overhead on the controller. - Use `hashLocalSort` for aggregation rather than `globalSort`, once it's multithreaded, to reduce dependency on the controller and on statistics gathering. - Aggregate (combine) opportunistically during sorting. - For aggregation, use per-thread hash tables that persist across segments. Currently each segment is processed with its own hash table, then the contents of the hash tables are sorted. It would be better to continue the hash aggregation as far as possible. - Improve maximum possible QPS by reducing overheads on simple queries like `SELECT COUNT(*) FROM tbl`. - Add QueryMetrics to reports for better insight into performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
