Hi devs,



With no more discussions being opened in thread [1], I'd like to start

a vote on PIP-21: Introduce Range Partition And Sort in Append 

Scalable Table Batch Writing for Flink [2].

The vote will be open for at least 72 hours unless there is an objection

or insufficient votes.




Best regards,

Wencong




[1] https://lists.apache.org/thread/pyhnjcq259897sb84fprk1362b1dwcsb 

[2] 
https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink











At 2024-05-09 09:43:39, "Jingsong Li" <[email protected]> wrote:
>Thanks Wencong for updating.
>
>+1
>
>Best,
>Jingsong
>
>On Wed, May 8, 2024 at 11:02 PM Wencong Liu <[email protected]> wrote:
>>
>> Thank you for the responses from Xintong and Jinsong.
>>
>>
>>
>>
>> To Xintong,
>>
>> > Q1:
>>
>> The naming conventions for identifiers in Flink SQL are consistent
>>
>> with the SQL standard, meaning identifiers cannot contain spaces or
>>
>> commas, so using commas as separators is not a problem.
>>
>>
>>
>>
>> When this configuration item is not specified or is specified as an empty 
>> string,
>>
>> it means that the range partition write feature is not enabled.
>>
>> I have added the related explanation in the description of the configuration 
>> item.
>>
>>
>>
>>
>> > Q2:
>>
>> Following our offline discussions with Jinsong, we all agree that it is
>>
>> necessary to clearly state the specific judgment rules corresponding to
>>
>> the auto mode in the description of the 'sink.clustering.strategy'
>>
>> configuration item.
>>
>>
>>
>>
>> > Q3:
>>
>> In the current implementation, the total sample size equals the number
>>
>> of range partitions times 100, which aligns with Spark's implementation. 
>> Spark
>>
>> allows controlling the number of samples per range partition through the
>>
>> "spark.sql.execution.rangeExchange.sampleSizePerPartition" parameter, with a
>>
>> default of 100. Similarly, we can provide a 
>> 'sink.clustering.sample-size-in-cluster'
>>
>> parameter for control.
>>
>>
>>
>>
>> To Jinsong,
>>
>> I agree with your option. Renaming the 'sink.clustering.sort-partition'
>>
>> configuration item to 'sink.clustering.sort-in-cluster' makes it
>>
>> easier to understand. The default value should be `true`, and the
>>
>> relevant description has been updated in the PIP.
>>
>>
>>
>>
>> Best,
>>
>> Wencong
>>
>>
>>
>>
>>
>> At 2024-05-06 14:57:05, "Jingsong Li" <[email protected]> wrote:
>> >Hi Wencong,
>> >
>> >I misunderstood the meaning of sort-partition, it is very confusing to
>> >table partition.
>> >
>> >I suggest we can change 'sink.clustering.sort-partition' to
>> >'sink.clustering.local-sort'.
>> >
>> >And I think the default value should be true. Compared to distributed
>> >range sort, local sorting is a low-cost behavior and we should
>> >complete it by default.
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Apr 29, 2024 at 9:42 PM Jingsong Li <[email protected]> wrote:
>> >>
>> >> I second Xintong’s suggestion, we can just let the default value is 
>> >> order, auto is too early for us now, you can take a look to other systems.
>> >>
>> >> And for sink.clustering.sort-partition:
>> >> Indicates whether to further sort each partition after range 
>> >> partitioning, enhancing data orderliness within each partition.
>> >>
>> >> Maybe adding partition fields to range sort is better? We already have 
>> >> spill mechanism to avoid OOM in writing. This looks not so useful. But, 
>> >> range sort to partition fields is useful. Can reduce  small files.
>> >>
>> >> Xintong Song <[email protected]>于2024年4月29日 周一15:26写道:
>> >>>
>> >>> +1 for the proposal in general. The feature should significantly improve
>> >>> the performance that downstream workloads read data from the tables.
>> >>>
>> >>> I have a few suggestions / questions.
>> >>>
>> >>> 1. For `sink.clustering.by-columns`, I think it would be nice to 
>> >>> explicitly
>> >>> mention that not specified (or null) means the feature is not enabled.
>> >>>
>> >>> 2. For `sink.clustering.strategy`, I'd suggest not to expose the 
>> >>> behaviors
>> >>> when the value is `auto` to users. For this developer-oriented PIP
>> >>> document, it's important to make the behavior clear so that people can 
>> >>> vote
>> >>> on it. But for the user-oriented configuration description, `auto` would
>> >>> simply mean the system would automatically choose a strategy and users
>> >>> don't need to worry about it. Moreover, not exposing the behavior would
>> >>> give us the chance to change it in future if necessary, without breaking
>> >>> any commitment that we made to users.
>> >>>
>> >>> 3. I'd like to understand a bit more about the sampling strategy. In
>> >>> particular, how much data is sampled out of the entire data set? Is this
>> >>> decided by a certain sampling rate, or is the amount of samples fixed
>> >>> regardless of the size of the data set? Should the rate / amount be
>> >>> configurable, or any practices suggest that a hard-coded parameter works
>> >>> fine in most use cases?
>> >>>
>> >>> Best,
>> >>>
>> >>> Xintong
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Apr 23, 2024 at 10:59 PM Wencong Liu <[email protected]> 
>> >>> wrote:
>> >>>
>> >>> > Thanks for your reply.
>> >>> > 1.Yes. The LocalSample will receive data emitted by the
>> >>> > Upstream Operator and perform sampling. The
>> >>> > specific sampling algorithm used is reservoir sampling [1].
>> >>> > 2. Assign Range Index will wait until all records have
>> >>> > been consumed by Local Sample and the result
>> >>> > is generated by Global Sample.
>> >>> >
>> >>> > [1] https://arxiv.org/pdf/1903.12065v1.pdf
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> > At 2024-04-23 20:48:45, "wj wang" <[email protected]> wrote:
>> >>> > >Hi,Wencong
>> >>> > >I have two small questions.
>> >>> > >1. Add record will be emitted from `Upstream Operator` to `Local
>> >>> > >Sample`? If not, what is the sample rule?
>> >>> > >2. From pip, I infer that the record in `Assign Range Index` should
>> >>> > >wait for the broadcast result from `Global Sample`,So How long do they
>> >>> > >wait? Until all records have been consumed by `Local Sample` or not?
>> >>> > >
>> >>> > >Best,
>> >>> > >wangwj
>> >>> > >
>> >>> > >On Mon, Apr 22, 2024 at 6:20 PM Jingsong Li <[email protected]>
>> >>> > wrote:
>> >>> > >>
>> >>> > >> +1 for your proposal.
>> >>> > >>
>> >>> > >> You can add to the description.
>> >>> > >>
>> >>> > >> Best,
>> >>> > >> Jingsong
>> >>> > >>
>> >>> > >> On Mon, Apr 22, 2024 at 6:15 PM Wencong Liu <[email protected]>
>> >>> > wrote:
>> >>> > >> >
>> >>> > >> > Hi Jinsong,
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > This topic requires discussion, hence it wasn't directly 
>> >>> > >> > addressed in
>> >>> > the PIP.
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > I believe the type of sorting algorithm to use depends on the 
>> >>> > >> > number
>> >>> > of
>> >>> > >> > fields specified by the user for comparison. When only one 
>> >>> > >> > comparison
>> >>> > field is
>> >>> > >> > specified, it's best to use basic data types for direct comparison
>> >>> > for the most accurate
>> >>> > >> > results. For multiple comparison fields, both the Z-order curve 
>> >>> > >> > and
>> >>> > Hilbert curve algorithms
>> >>> > >> > are suitable. In such cases, data maintains a certain level of 
>> >>> > >> > order
>> >>> > in any comparison
>> >>> > >> > field. Generally, the computation cost of the Z-order curve 
>> >>> > >> > algorithm
>> >>> > is lower
>> >>> > >> > than that of the Hilbert curve algorithm. However, in 
>> >>> > >> > high-dimensional
>> >>> > >> > scenarios, the Hilbert curve has an advantage in maintaining data
>> >>> > clustering.
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > Therefore, I propose an automatic selection based on the number of
>> >>> > >> > comparison columns:
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > 1 column: Basic type comparison algorithm.
>> >>> > >> >
>> >>> > >> > Less than 5 columns: Z-order curve algorithm.
>> >>> > >> >
>> >>> > >> > 5 or more columns: Hilbert curve algorithm.
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > The threshold of 5 columns is based on Ververica's practice with
>> >>> > Paimon
>> >>> > >> > Append Scalable tables, which was also discussed offline with 
>> >>> > >> > Junhao
>> >>> > Ye.
>> >>> > >> > In addition to automatic configuration, users can fine-tune for
>> >>> > specific
>> >>> > >> > scenarios by explicitly specifying the desired comparison 
>> >>> > >> > strategy.
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > WDYT?
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > Best,
>> >>> > >> >
>> >>> > >> > Wencong
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> >
>> >>> > >> > At 2024-04-22 15:08:09, "Jingsong Li" <[email protected]> 
>> >>> > >> > wrote:
>> >>> > >> > >Hi Wencong,
>> >>> > >> > >
>> >>> > >> > >Mostly looks good to me.
>> >>> > >> > >
>> >>> > >> > >"it will automatically determine the algorithm based on the 
>> >>> > >> > >number of
>> >>> > >> > >columns in 'sink.clustering.by-columns'. "
>> >>> > >> > >
>> >>> > >> > >Please describe this clearly in the `Description`.
>> >>> > >> > >
>> >>> > >> > >Best,
>> >>> > >> > >Jingsong
>> >>> > >> > >
>> >>> > >> > >On Mon, Apr 22, 2024 at 2:36 PM Wencong Liu 
>> >>> > >> > ><[email protected]>
>> >>> > wrote:
>> >>> > >> > >>
>> >>> > >> > >> Hi devs,
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> I'm proposing a new feature to introduce range partitioning and
>> >>> > sorting in append scalable table
>> >>> > >> > >>
>> >>> > >> > >> writing for Flink. The goal is to optimize query performance by
>> >>> > reducing data scans on large datasets.
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> The proposal includes:
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> 1. Configurable range partitioning and sorting during data 
>> >>> > >> > >> writing
>> >>> > which allows for
>> >>> > >> > >>
>> >>> > >> > >> a more efficient data distribution strategy.
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> 2. Introduction of new configurations that will enable users to
>> >>> > specify columns for
>> >>> > >> > >>
>> >>> > >> > >> comparison, choose a comparison algorithm for range 
>> >>> > >> > >> partitioning,
>> >>> > and further sort each
>> >>> > >> > >>
>> >>> > >> > >> partition if required.
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> 3. Detailed explanation of the division of processing steps 
>> >>> > >> > >> when
>> >>> > range partitioning
>> >>> > >> > >>
>> >>> > >> > >> is enabled and the conditional inclusion of the sorting phase.
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> Looking forward to discussing this in the upcoming PIP [1].
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> Best regards,
>> >>> > >> > >>
>> >>> > >> > >> Wencong Liu
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >>
>> >>> > >> > >> [1]
>> >>> > https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink
>> >>> >

Reply via email to