jihoonson opened a new issue #5543: [Proposal] Native parallel batch indexing
URL: https://github.com/apache/incubator-druid/issues/5543
 
 
   We currently have two types of batch index tasks, i.e., local index task and 
Hadoop index task. Also a spark task is available with a [third party 
extension](https://github.com/metamx/druid-spark-batch). 
   
   ## Motivation
   
   All task types have some limitations. The Hadoop/Spark index task requires 
an external Hadoop/Spark cluster to be run. This kind of dependency on external 
systems usually causes some issues like
   
   1. users must have an external cluster, 
   2. the library versions used by both Druid and Hadoop/Spark should match, 
and 
   3. the external cluster should be large enough to run Druid batch jobs 
without degrading the performance of other workflows.
   
   Meeting these all requirements has been painful for the people who have just 
started to try Druid.
   
   The local index task doesn't have a dependency on any external systems, but 
it runs with a single thread and thus it doesn't fit for practical use cases.
   
   As a result, we need a new task type which meets both requirements of lack 
of dependency on external systems and being capable of parallel indexing. This 
is what we're already doing in Kafka ingestion.
   
   ## Goals
   
   The goal of this proposal is introducing new parallel indexing methods. This 
is not to replace existing Hadoop task with new ones. It's about providing more 
cool options to Druid users.
   
   - Single phase parallel indexing without shuffling intermediate data
   - Two phase parallel indexing with shuffling intermediate data. This can be 
used for perfect rollup.
   - New indexing methods don't have dependency on other systems and use only 
the resource of the Druid cluster (`peons`)
   - New indexing methods work based on the existing Druid's parallel task 
execution mechanism.
     - Every new task types are submitted to the overlord and treated as a 
normal task types. There should be no special handling for these task types.
   
   ## Design
   
   Each new parallel indexing with/without shuffle consists of two task types, 
i.e., a supervisor task and its worker tasks. Once a supervisor task is 
submitted to an overlord by users, it internally submits its worker tasks to 
the overlord. 
   
   Worker tasks read input data and generate segments. The generated segments 
are pushed by worker tasks. Once they finish their work, they report the list 
of pushed segments to the supervisor task. 
   
   The supervisor task monitors the worker task statuses. If one of them fails, 
the supervisor task retries the failed one until the retry number reaches a 
preconfigured threshold. Once all worker task succeeds, then the supervisor 
task collects pushed segments from all worker tasks and publishes them 
atomically. 
   
   In both indexing methods, the parallelism of the initial phase is decided by 
the input firehose. To do this, the _splittable_ firehose is introduced. A 
splittable firehose is responsible to let the supervisor task know how the 
input can be split. The supervisor task generates worker tasks according to the 
splittable firehose implementation.
   
   In two phase parallel indexing, the supervisor task submits worker tasks of 
the second phase once the first phase completes. The second phase workers read 
the intermediate result of the first phase workers and generates segments. The 
parallelism of the second phase can be decided by the size of intermediate 
data. Thus, the supervisor should be capable of collecting the size of 
intermediate data from all worker tasks and adjusting the parallelism depending 
on the size. To support shuffle, the intermediate result of the first phase 
should be kept until the second phase completes. 
   
   ## Implementation Plan
   
   - [Single phase parallel indexing without 
shuffle](https://github.com/druid-io/druid/pull/5492)
   - General shuffle system which is available for both indexing systems and 
querying systems in Druid
     - The shuffle system should be available for two phase parallel indexing
     - The shuffle system should also be available for Druid's querying system. 
This can be used for faster query processing when the size of intermediate data 
is large.
   - Two phase parallel indexing with shuffle
   
   ## Out of scope of this Proposal
   
   - Doesn't create a new general processing framework like Hadoop or Spark. We 
don't need to reinvent the wheel.
   - Doesn't replace existing Hadoop tasks with new ones. They are still great.
   - Doesn't make a better task scheduling in overlords like Yarn's fair 
scheduler. This is required, but should be done separately.
   - Doesn't handle the single point of failures of the supervisor tasks. This 
might be done separately if needed in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to