jihoonson opened a new issue #9241: Append support for hash/range-partitioned 
segments
URL: https://github.com/apache/druid/issues/9241
 
 
   ### Motivation
   
   Druid currently doesn't support append hash/range-partitioned segments even 
though it's required in productions pretty often.
   
   ### Proposed changes
   
   This proposal proposes to support append segments only when the new segments 
are partitioned in the same way with the existing ones. For example, you can 
append new hash-partitioned segments to an existing hash-partitioned datasource 
if they are partitioned using the exactly same hash function. However, you 
cannot append range-partitioned segments to a hash-partitioned datasource. You 
can always append linearly partitioned segments to a linearly partitioned 
datasource.
   
   In this proposal, I would like to use the term "partition ID" to represent 
the last number in the segment ID. For example, given a segment 
`myDatasource_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2020-01-02T02:23:54.235Z_2`,
 its partition ID is 2. The partition ID is regarded as 0 if it's missing in 
the segment ID.
   
   #### Changes in `ShardSpec`
   
   Currently, the partition ID of the segment is tightly coupled with the 
bucket in the hash/range partitioning. For example, in the hash partitioning, 
the partition ID is `(the hash value of the partition dimensions % partition 
size)`. In the range partitioning, the partition ID is the linearly increasing 
number for range-partitioned buckets.
   
   However, to support append, we need a new way to tell the given two segments 
fall in the same bucket or not. To do this, the segment needs to keep an 
additional information of the bucket as below:
   
   ```java
   public interface ShardSpec
   {
   ...
     /**
      * Returns the bucket ID of this partition (segment). A bucket represents 
the secondary partition
      * in the hash or the range partitioning. This always returns 0 for the 
linear partitioning.
      */
     short getBucketId();
   ...
   }
   ```
   
   Note that we need to track all segments in the same bucket for the easier 
segment pruning in the brokers. To make things simple, the partition ID must be 
_aligned_ with the bucket ID which means `partition ID = n * bucket ID`. As a 
result, the partition IDs might not be continuous after append new segments. 
This is also useful for compatibility because the bucket ID can be easily 
restored even when it's missing after the downgrades.
   
   This may cause some issues with the minor compaction because the minor 
compaction can compact segments only when their partition IDs are adjacent. See 
the Future work section for this issue.
   
   #### Changes in the Simple task and the Parallel task
   
   The index task needs to allow `partitionsSpec` when `forceGuaranteedRollup` 
= false because the append doesn't necessarily guarantee the perfect rollup 
across the entire datasource. If `appendToExisting` = true and 
`forceGuaranteedRollup` = false in the ingestionSpec, then the indexing task 
will create the _perfectly rolled up segments_ from the input and append them 
to an existing datasource. This is to create more compact segments, but I guess 
it might be useful to not create perfectly rolled up segments in some cases. 
Please see the Future work section for more details.
   
   If `appendingToExisting` = true, the `partitionsSpec` can be optional.
   
   - If it's an empty time chunk, the given `partitionsSpec` will be used.
   - If it's not an empty time chunk, the given `partitionsSpec` will be 
ignored with a warning in the task logs.
     - When the parallel index task appends to a non-empty datasource with the 
range partitioning, it can skip the first phase to find the best partitioning.
   
   If `appendToExisting` = true, the indexing task should use the action-based 
segment allocator to find proper partition IDs for new segments which is a 
centralized segment allocator in the overlord. When they request to allocate 
new segment IDs, the indexing task should send the proper bucket ID to the 
overlord. As a result, `SegmentAllocateAction` would be:
   
   ```java
     @JsonCreator
     public SegmentAllocateAction(
         @JsonProperty("dataSource") String dataSource,
         @JsonProperty("timestamp") DateTime timestamp,
         @JsonProperty("queryGranularity") Granularity queryGranularity,
         @JsonProperty("preferredSegmentGranularity") Granularity 
preferredSegmentGranularity,
         @JsonProperty("sequenceName") String sequenceName,
         @JsonProperty("previousSegmentId") String previousSegmentId,
         @JsonProperty("skipSegmentLineageCheck") boolean 
skipSegmentLineageCheck,
         @JsonProperty("bucketId") @Nullable Short bucketId, // new field
         @JsonProperty("shardSpecFactory") @Nullable ShardSpecFactory 
shardSpecFactory, // nullable for backward compatibility
         @JsonProperty("lockGranularity") @Nullable LockGranularity 
lockGranularity // nullable for backward compatibility
     )
   ...
   ```
   
   The `SegmentAllocateAction` will return a proper partition ID which is 
aligned to be the given bucket ID.
   
   #### Backward compatibility
   
   - The `shardSpec` computes the bucket ID by `partition ID % core partition 
size` if it's missing in the JSON. The core partition size is the number of 
segments when the first task 
   - The requested bucket ID in the `SegmentAllocateAction` is regarded as 0 if 
it's missing in the JSON.
   
   ### Rationale
   
   These are dropped ideas.
   
   - The "Multi" input source
     - Just like the Hadoop task, the native task can support the Multi input 
source to read data from two or more input sources. This indirectly supports a 
sort of "append" by specifying both the existing segments and new data in the 
input source spec. However, the main use case of the Multi input source would 
be reading from multiple input sources rather than append.
   - Independent partition ID and bucket ID
     - To make the partition ID and the bucket ID independent, the 
`VersionedIntervalTimeline` should be able to efficiently filter out the 
segments of the same bucket. This requires an extra index or data structure in 
memory of the brokers and the coordinator.
   
   ### Operational impact
   
   - The broker and the coordinator can use a bit more memory because of the 
new field in the `shardSpec`.
   - Deprecate appending linearly partitioned segments to hash partitioned 
datasources
     - `HashBasedNumberedShardSpec` is being used to allow append linearly 
partitioned segments to hash partitioned datasources. However, I don't see this 
is very useful once we support appending hash partitioned segments to hash 
partitioned datasources.
   - There should be no issues in rolling upgrades and downgrades.
   
   ### Test plan
   
   - New integration tests will be added.
   - Will test in a real Druid cluster.
   - Will verify there is issue in rolling upgrades/downgrades
   
   ### Future work
   
   - When a task creates new segments with the hash/range partitioning, it can 
split a bucket into multiple segments based on the segment size (or number of 
rows). This can be thought as supporting the fixed linear partitioning for the 
_tertiary_ partitioning.
   - Partition-aware minor compaction
     - The minor compaction should be able to compact the segments of the same 
bucket. This may require changing the meaning of the "adjacent partition IDs" 
per partitioning method.
   - In this proposal, when `appendToExisting` = true and 
`forceGuaranteedRollup` = false, the indexing task always creates new segments 
which are perfectly rolled up. We may want to add a new configuration, 
`partitionWithShuffle`, to control this behavior.
   
   Semi-related, the segment pruning in brokers should be supported for hash 
partitioned datasources as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to