Jeyhun, Sorry for the delay. And thanks for the explanation, it sounds good to me!
Jeyhun Karimov <je.kari...@gmail.com> 于2024年3月16日周六 05:09写道: > > Hi Benchao, > > Thanks for your comments. > > 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > > if we cannot have a good greatest common divisor, like 127 + 128, > > could we just utilize one side's pre-partitioned attribute, and let > > another side just do the shuffle? > > > There are two cases we need to consider: > > 1. Static Partition (no partitions are added during the query execution) is > enabled AND both sources implement "SupportsPartitionPushdown" > > In this case, we are sure that no new partitions will be added at runtime. > So, we have a chance equalize both sources' partitions and parallelism, IFF > both sources implement "SupportsPartitionPushdown" interface. > To achieve so, first we will fetch the existing partitions from source1 > (say p_s1) and from source2 (say p_s2). > Then, we find the intersection of these two partition sets (say > p_intersect) and pushdown these partitions: > > SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that > only specific partitions are read > SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read > with filtered partitions > > Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3) > all of their downstream operators until (and including) their first common > ancestor (e.g., join) to be equal to the number of partitions (size of > p_intersect). > > 2. All other cases > > In all other cases, the parallelism of both sources and their downstream > operators until their common ancestor would be equal to the MIN(p_s1, > p_s2). > That is, minimum of the partition size of source1 and partition size of > source2 will be selected as the parallelism. > Coming back to your example, if source1 parallelism is 127 and source2 > parallelism is 128, then we will first check the partition size of source1 > and source2. > Say partition size of source1 is 100 and partition size of source2 is 90. > Then, we would set the parallelism for source1, source2, and all of their > downstream operators until (and including) the join operator > to 90 (min(100, 90)). > We also plan to implement a cost based decision instead of the rule-based > one (the ones explained above - MIN rule). > One possible result of the cost based estimation is to keep the partitions > on one side and perform the shuffling on another source. > > > > 2. In our current shuffle remove design (FlinkExpandConversionRule), > > we don't consider parallelism, we just remove unnecessary shuffles > > according to the distribution columns. After this FLIP, the > > parallelism may be bundled with source's partitions, then how will > > this optimization accommodate with FlinkExpandConversionRule, will you > > also change downstream operator's parallelisms if we want to also > > remove subsequent shuffles? > > > > - From my understanding of FlinkExpandConversionRule, its removal logic is > agnostic to operator parallelism. > So, if FlinkExpandConversionRule decides to remove a shuffle operation, > then this FLIP will search another possible shuffle (the one closest to the > source) to remove. > If there is such an opportunity, this FLIP will remove the shuffle. So, > from my understanding FlinkExpandConversionRule and this optimization rule > can work together safely. > Please correct me if I misunderstood your question. > > > > Regarding the new optimization rule, have you also considered to allow > > some non-strict mode like FlinkRelDistribution#requireStrict? For > > example, source is pre-partitioned by a, b columns, if we are > > consuming this source, and do a aggregate on a, b, c, can we utilize > > this optimization? > > > - Good point. Yes, there are some cases that non-strict mode will apply. > For example: > > - pre-partitioned columns and aggregate columns are the same but have > different order (e.g., source pre-partitioned w.r.t. a,b and aggregate has > a GROUP BY b,a) > - columns in the Exchange operator is a list-prefix of pre-partitoned > columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and > Exchange's partition columns are a,b) > > Please let me know if the above answers your questions or if you have any > other comments. > > Regards, > Jeyhun > > On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> wrote: > > > Thanks Jeyhun for bringing up this discussion, it is really exiting, > > +1 for the general idea. > > > > We also introduced a similar concept in Flink Batch internally to cope > > with bucketed tables in Hive, it is a very important improvement. > > > > > One thing to note is that for join queries, the parallelism of each join > > > source might be different. This might result in > > > inconsistencies while using the pre-partitioned/pre-divided data (e.g., > > > different mappings of partitions to source operators). > > > Therefore, it is the job of planner to detect this and adjust the > > > parallelism. With that having in mind, > > > the rest (how the split assigners perform) is consistent among many > > > sources. > > > > Could you elaborate a little more on this. I added my two cents here > > about this part: > > 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > > if we cannot have a good greatest common divisor, like 127 + 128, > > could we just utilize one side's pre-partitioned attribute, and let > > another side just do the shuffle? > > 2. In our current shuffle remove design (FlinkExpandConversionRule), > > we don't consider parallelism, we just remove unnecessary shuffles > > according to the distribution columns. After this FLIP, the > > parallelism may be bundled with source's partitions, then how will > > this optimization accommodate with FlinkExpandConversionRule, will you > > also change downstream operator's parallelisms if we want to also > > remove subsequent shuffles? > > > > > > Regarding the new optimization rule, have you also considered to allow > > some non-strict mode like FlinkRelDistribution#requireStrict? For > > example, source is pre-partitioned by a, b columns, if we are > > consuming this source, and do a aggregate on a, b, c, can we utilize > > this optimization? > > > > Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: > > > > > > Hi Jeyhun, > > > > > > Thanks for your clarification. > > > > > > > Once a new partition is detected, we add it to our existing mapping. > > Our > > > mapping looks like Map<Integer, Set<Integer>> > > subtaskToPartitionAssignment, > > > where it maps each source subtaskID to zero or more partitions. > > > > > > I understand your point. **It would be better if you could sync the > > > content to the FLIP**. > > > > > > Another thing is I'm curious about what the physical plan looks like. Is > > > there any specific info that will be added to the table source (like > > > filter/project pushdown)? It would be great if you could attach an > > example > > > to the FLIP. > > > > > > Bests, > > > Jane > > > > > > On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> > > wrote: > > > > > > > Hi Jane, > > > > > > > > Thanks for your comments. > > > > > > > > > > > > 1. Concerning the `sourcePartitions()` method, the partition > > information > > > > > returned during the optimization phase may not be the same as the > > > > partition > > > > > information during runtime execution. For long-running jobs, > > partitions > > > > may > > > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > > > > > > > - Good point. This scenario is definitely supported. > > > > Once a new partition is added, or in general, new splits are > > > > discovered, > > > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > > > > newSplits) > > > > method will be called. Inside that method, we are able to detect if a > > split > > > > belongs to existing partitions or there is a new partition. > > > > Once a new partition is detected, we add it to our existing mapping. > > Our > > > > mapping looks like Map<Integer, Set<Integer>> > > subtaskToPartitionAssignment, > > > > where > > > > it maps each source subtaskID to zero or more partitions. > > > > > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > > > understand that it is also necessary to verify whether the hash key > > > > within > > > > > the Exchange node is consistent with the partition key defined in the > > > > table > > > > > source that implements `SupportsPartitioning`. > > > > > > > > > > > > - Yes, I overlooked that point, fixed. Actually, the rule is much > > > > complicated. I tried to simplify it in the FLIP. Good point. > > > > > > > > > > > > 3. Could you elaborate on the desired physical plan and integration > > with > > > > > `CompiledPlan` to enhance the overall functionality? > > > > > > > > > > > > - For compiled plan, PartitioningSpec will be used, with a json tag > > > > "Partitioning". As a result, in the compiled plan, the source operator > > will > > > > have > > > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled > > plan. > > > > More about the implementation details below: > > > > > > > > -------------------------------- > > > > PartitioningSpec class > > > > -------------------------------- > > > > @JsonTypeName("Partitioning") > > > > public final class PartitioningSpec extends SourceAbilitySpecBase { > > > > // some code here > > > > @Override > > > > public void apply(DynamicTableSource tableSource, > > SourceAbilityContext > > > > context) { > > > > if (tableSource instanceof SupportsPartitioning) { > > > > ((SupportsPartitioning<?>) > > tableSource).applyPartitionedRead(); > > > > } else { > > > > throw new TableException( > > > > String.format( > > > > "%s does not support > > SupportsPartitioning.", > > > > tableSource.getClass().getName())); > > > > } > > > > } > > > > // some code here > > > > } > > > > > > > > -------------------------------- > > > > SourceAbilitySpec class > > > > -------------------------------- > > > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > > > > JsonTypeInfo.As.PROPERTY, property = "type") > > > > @JsonSubTypes({ > > > > @JsonSubTypes.Type(value = FilterPushDownSpec.class), > > > > @JsonSubTypes.Type(value = LimitPushDownSpec.class), > > > > @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > > > > @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > > > > @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > > > > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > > > > @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > > > > @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > > > > + @JsonSubTypes.Type(value = PartitioningSpec.class) > > // > > > > new added > > > > > > > > > > > > > > > > Please let me know if that answers your questions or if you have other > > > > comments. > > > > > > > > Regards, > > > > Jeyhun > > > > > > > > > > > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> > > wrote: > > > > > > > > > Hi Jeyhun, > > > > > > > > > > Thank you for leading the discussion. I'm generally +1 with this > > > > proposal, > > > > > along with some questions. Please see my comments below. > > > > > > > > > > 1. Concerning the `sourcePartitions()` method, the partition > > information > > > > > returned during the optimization phase may not be the same as the > > > > partition > > > > > information during runtime execution. For long-running jobs, > > partitions > > > > may > > > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > > > understand that it is also necessary to verify whether the hash key > > > > within > > > > > the Exchange node is consistent with the partition key defined in the > > > > table > > > > > source that implements `SupportsPartitioning`. > > > > > > > > > > 3. Could you elaborate on the desired physical plan and integration > > with > > > > > `CompiledPlan` to enhance the overall functionality? > > > > > > > > > > Best, > > > > > Jane > > > > > > > > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > > <jhug...@confluent.io.invalid > > > > > > > > > > wrote: > > > > > > > > > > > Hi Jeyhun, > > > > > > > > > > > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to > > > > > > generalize FLIP-434 to be about "pre-divided" data to cover > > "buckets" > > > > and > > > > > > "partitions" (and maybe even situations where a data source is > > > > > partitioned > > > > > > and bucketed). > > > > > > > > > > > > Separate from that, the page mentions TPC-H Q1 as an example. For > > a > > > > > join, > > > > > > any two tables joined on the same bucket key should provide a > > concrete > > > > > > example of a join. Systems like Kafka Streams/ksqlDB call this > > > > > > "co-partitioning"; for those systems, it is a requirement placed > > on the > > > > > > input sources. For Flink, with FLIP-434, the proposed planner rule > > > > > > could remove the shuffle. > > > > > > > > > > > > Definitely a fun idea; I look forward to hearing more! > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Jim > > > > > > > > > > > > > > > > > > 1. > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > > > > > 2. > > > > > > > > > > > > > > > > > > > > > > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > > > > > > > > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < > > je.kari...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > I’d like to start a discussion on FLIP-434: Support > > optimizations for > > > > > > > pre-partitioned data sources [1]. > > > > > > > > > > > > > > The FLIP introduces taking advantage of pre-partitioned data > > sources > > > > > for > > > > > > > SQL/Table API (it is already supported as experimental feature in > > > > > > > DataStream API [2]). > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > Regards, > > > > > > > Jeyhun > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > -- Best, Benchao Li