jihoonson opened a new issue #5543: [Proposal] Native parallel batch indexing URL: https://github.com/apache/incubator-druid/issues/5543 We currently have two types of batch index tasks, i.e., local index task and Hadoop index task. Also a spark task is available with a [third party extension](https://github.com/metamx/druid-spark-batch). ## Motivation All task types have some limitations. The Hadoop/Spark index task requires an external Hadoop/Spark cluster to be run. This kind of dependency on external systems usually causes some issues like 1. users must have an external cluster, 2. the library versions used by both Druid and Hadoop/Spark should match, and 3. the external cluster should be large enough to run Druid batch jobs without degrading the performance of other workflows. Meeting these all requirements has been painful for the people who have just started to try Druid. The local index task doesn't have a dependency on any external systems, but it runs with a single thread and thus it doesn't fit for practical use cases. As a result, we need a new task type which meets both requirements of lack of dependency on external systems and being capable of parallel indexing. This is what we're already doing in Kafka ingestion. ## Goals The goal of this proposal is introducing new parallel indexing methods. This is not to replace existing Hadoop task with new ones. It's about providing more cool options to Druid users. - Single phase parallel indexing without shuffling intermediate data - Two phase parallel indexing with shuffling intermediate data. This can be used for perfect rollup. - New indexing methods don't have dependency on other systems and use only the resource of the Druid cluster (`peons`) - New indexing methods work based on the existing Druid's parallel task execution mechanism. - Every new task types are submitted to the overlord and treated as a normal task types. There should be no special handling for these task types. ## Design Each new parallel indexing with/without shuffle consists of two task types, i.e., a supervisor task and its worker tasks. Once a supervisor task is submitted to an overlord by users, it internally submits its worker tasks to the overlord. Worker tasks read input data and generate segments. The generated segments are pushed by worker tasks. Once they finish their work, they report the list of pushed segments to the supervisor task. The supervisor task monitors the worker task statuses. If one of them fails, the supervisor task retries the failed one until the retry number reaches a preconfigured threshold. Once all worker task succeeds, then the supervisor task collects pushed segments from all worker tasks and publishes them atomically. In both indexing methods, the parallelism of the initial phase is decided by the input firehose. To do this, the _splittable_ firehose is introduced. A splittable firehose is responsible to let the supervisor task know how the input can be split. The supervisor task generates worker tasks according to the splittable firehose implementation. In two phase parallel indexing, the supervisor task submits worker tasks of the second phase once the first phase completes. The second phase workers read the intermediate result of the first phase workers and generates segments. The parallelism of the second phase can be decided by the size of intermediate data. Thus, the supervisor should be capable of collecting the size of intermediate data from all worker tasks and adjusting the parallelism depending on the size. To support shuffle, the intermediate result of the first phase should be kept until the second phase completes. ## Implementation Plan - [Single phase parallel indexing without shuffle](https://github.com/druid-io/druid/pull/5492) - General shuffle system which is available for both indexing systems and querying systems in Druid - The shuffle system should be available for two phase parallel indexing - The shuffle system should also be available for Druid's querying system. This can be used for faster query processing when the size of intermediate data is large. - Two phase parallel indexing with shuffle ## Out of scope of this Proposal - Doesn't create a new general processing framework like Hadoop or Spark. We don't need to reinvent the wheel. - Doesn't replace existing Hadoop tasks with new ones. They are still great. - Doesn't make a better task scheduling in overlords like Yarn's fair scheduler. This is required, but should be done separately. - Doesn't handle the single point of failures of the supervisor tasks. This might be done separately if needed in the future.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
