Thank you for the responses from Xintong and Jinsong.
To Xintong, > Q1: The naming conventions for identifiers in Flink SQL are consistent with the SQL standard, meaning identifiers cannot contain spaces or commas, so using commas as separators is not a problem. When this configuration item is not specified or is specified as an empty string, it means that the range partition write feature is not enabled. I have added the related explanation in the description of the configuration item. > Q2: Following our offline discussions with Jinsong, we all agree that it is necessary to clearly state the specific judgment rules corresponding to the auto mode in the description of the 'sink.clustering.strategy' configuration item. > Q3: In the current implementation, the total sample size equals the number of range partitions times 100, which aligns with Spark's implementation. Spark allows controlling the number of samples per range partition through the "spark.sql.execution.rangeExchange.sampleSizePerPartition" parameter, with a default of 100. Similarly, we can provide a 'sink.clustering.sample-size-in-cluster' parameter for control. To Jinsong, I agree with your option. Renaming the 'sink.clustering.sort-partition' configuration item to 'sink.clustering.sort-in-cluster' makes it easier to understand. The default value should be `true`, and the relevant description has been updated in the PIP. Best, Wencong At 2024-05-06 14:57:05, "Jingsong Li" <[email protected]> wrote: >Hi Wencong, > >I misunderstood the meaning of sort-partition, it is very confusing to >table partition. > >I suggest we can change 'sink.clustering.sort-partition' to >'sink.clustering.local-sort'. > >And I think the default value should be true. Compared to distributed >range sort, local sorting is a low-cost behavior and we should >complete it by default. > >Best, >Jingsong > >On Mon, Apr 29, 2024 at 9:42 PM Jingsong Li <[email protected]> wrote: >> >> I second Xintong’s suggestion, we can just let the default value is order, >> auto is too early for us now, you can take a look to other systems. >> >> And for sink.clustering.sort-partition: >> Indicates whether to further sort each partition after range partitioning, >> enhancing data orderliness within each partition. >> >> Maybe adding partition fields to range sort is better? We already have spill >> mechanism to avoid OOM in writing. This looks not so useful. But, range sort >> to partition fields is useful. Can reduce small files. >> >> Xintong Song <[email protected]>于2024年4月29日 周一15:26写道: >>> >>> +1 for the proposal in general. The feature should significantly improve >>> the performance that downstream workloads read data from the tables. >>> >>> I have a few suggestions / questions. >>> >>> 1. For `sink.clustering.by-columns`, I think it would be nice to explicitly >>> mention that not specified (or null) means the feature is not enabled. >>> >>> 2. For `sink.clustering.strategy`, I'd suggest not to expose the behaviors >>> when the value is `auto` to users. For this developer-oriented PIP >>> document, it's important to make the behavior clear so that people can vote >>> on it. But for the user-oriented configuration description, `auto` would >>> simply mean the system would automatically choose a strategy and users >>> don't need to worry about it. Moreover, not exposing the behavior would >>> give us the chance to change it in future if necessary, without breaking >>> any commitment that we made to users. >>> >>> 3. I'd like to understand a bit more about the sampling strategy. In >>> particular, how much data is sampled out of the entire data set? Is this >>> decided by a certain sampling rate, or is the amount of samples fixed >>> regardless of the size of the data set? Should the rate / amount be >>> configurable, or any practices suggest that a hard-coded parameter works >>> fine in most use cases? >>> >>> Best, >>> >>> Xintong >>> >>> >>> >>> On Tue, Apr 23, 2024 at 10:59 PM Wencong Liu <[email protected]> wrote: >>> >>> > Thanks for your reply. >>> > 1.Yes. The LocalSample will receive data emitted by the >>> > Upstream Operator and perform sampling. The >>> > specific sampling algorithm used is reservoir sampling [1]. >>> > 2. Assign Range Index will wait until all records have >>> > been consumed by Local Sample and the result >>> > is generated by Global Sample. >>> > >>> > [1] https://arxiv.org/pdf/1903.12065v1.pdf >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > At 2024-04-23 20:48:45, "wj wang" <[email protected]> wrote: >>> > >Hi,Wencong >>> > >I have two small questions. >>> > >1. Add record will be emitted from `Upstream Operator` to `Local >>> > >Sample`? If not, what is the sample rule? >>> > >2. From pip, I infer that the record in `Assign Range Index` should >>> > >wait for the broadcast result from `Global Sample`,So How long do they >>> > >wait? Until all records have been consumed by `Local Sample` or not? >>> > > >>> > >Best, >>> > >wangwj >>> > > >>> > >On Mon, Apr 22, 2024 at 6:20 PM Jingsong Li <[email protected]> >>> > wrote: >>> > >> >>> > >> +1 for your proposal. >>> > >> >>> > >> You can add to the description. >>> > >> >>> > >> Best, >>> > >> Jingsong >>> > >> >>> > >> On Mon, Apr 22, 2024 at 6:15 PM Wencong Liu <[email protected]> >>> > wrote: >>> > >> > >>> > >> > Hi Jinsong, >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > This topic requires discussion, hence it wasn't directly addressed in >>> > the PIP. >>> > >> > >>> > >> > >>> > >> > >>> > >> > I believe the type of sorting algorithm to use depends on the number >>> > of >>> > >> > fields specified by the user for comparison. When only one comparison >>> > field is >>> > >> > specified, it's best to use basic data types for direct comparison >>> > for the most accurate >>> > >> > results. For multiple comparison fields, both the Z-order curve and >>> > Hilbert curve algorithms >>> > >> > are suitable. In such cases, data maintains a certain level of order >>> > in any comparison >>> > >> > field. Generally, the computation cost of the Z-order curve algorithm >>> > is lower >>> > >> > than that of the Hilbert curve algorithm. However, in >>> > >> > high-dimensional >>> > >> > scenarios, the Hilbert curve has an advantage in maintaining data >>> > clustering. >>> > >> > >>> > >> > >>> > >> > Therefore, I propose an automatic selection based on the number of >>> > >> > comparison columns: >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > 1 column: Basic type comparison algorithm. >>> > >> > >>> > >> > Less than 5 columns: Z-order curve algorithm. >>> > >> > >>> > >> > 5 or more columns: Hilbert curve algorithm. >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > The threshold of 5 columns is based on Ververica's practice with >>> > Paimon >>> > >> > Append Scalable tables, which was also discussed offline with Junhao >>> > Ye. >>> > >> > In addition to automatic configuration, users can fine-tune for >>> > specific >>> > >> > scenarios by explicitly specifying the desired comparison strategy. >>> > >> > >>> > >> > >>> > >> > WDYT? >>> > >> > >>> > >> > >>> > >> > >>> > >> > Best, >>> > >> > >>> > >> > Wencong >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > At 2024-04-22 15:08:09, "Jingsong Li" <[email protected]> wrote: >>> > >> > >Hi Wencong, >>> > >> > > >>> > >> > >Mostly looks good to me. >>> > >> > > >>> > >> > >"it will automatically determine the algorithm based on the number >>> > >> > >of >>> > >> > >columns in 'sink.clustering.by-columns'. " >>> > >> > > >>> > >> > >Please describe this clearly in the `Description`. >>> > >> > > >>> > >> > >Best, >>> > >> > >Jingsong >>> > >> > > >>> > >> > >On Mon, Apr 22, 2024 at 2:36 PM Wencong Liu <[email protected]> >>> > wrote: >>> > >> > >> >>> > >> > >> Hi devs, >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> I'm proposing a new feature to introduce range partitioning and >>> > sorting in append scalable table >>> > >> > >> >>> > >> > >> writing for Flink. The goal is to optimize query performance by >>> > reducing data scans on large datasets. >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> The proposal includes: >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> 1. Configurable range partitioning and sorting during data writing >>> > which allows for >>> > >> > >> >>> > >> > >> a more efficient data distribution strategy. >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> 2. Introduction of new configurations that will enable users to >>> > specify columns for >>> > >> > >> >>> > >> > >> comparison, choose a comparison algorithm for range partitioning, >>> > and further sort each >>> > >> > >> >>> > >> > >> partition if required. >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> 3. Detailed explanation of the division of processing steps when >>> > range partitioning >>> > >> > >> >>> > >> > >> is enabled and the conditional inclusion of the sorting phase. >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> Looking forward to discussing this in the upcoming PIP [1]. >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> Best regards, >>> > >> > >> >>> > >> > >> Wencong Liu >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> [1] >>> > https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink >>> >
