Jeyhun,

Sorry for the delay. And thanks for the explanation, it sounds good to me!

Jeyhun Karimov <je.kari...@gmail.com> 于2024年3月16日周六 05:09写道:
>
> Hi Benchao,
>
> Thanks for your comments.
>
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> > if we cannot have a good greatest common divisor, like 127 + 128,
> > could we just utilize one side's pre-partitioned attribute, and let
> > another side just do the shuffle?
>
>
> There are two cases we need to consider:
>
> 1. Static Partition (no partitions are added during the query execution) is
> enabled AND both sources implement "SupportsPartitionPushdown"
>
> In this case, we are sure that no new partitions will be added at runtime.
> So, we have a chance equalize both sources' partitions and parallelism, IFF
> both sources implement "SupportsPartitionPushdown" interface.
> To achieve so, first we will fetch the existing partitions from source1
> (say p_s1) and from source2 (say p_s2).
> Then, we find the intersection of these two partition sets (say
> p_intersect) and pushdown these partitions:
>
> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
> only specific partitions are read
> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read
> with filtered partitions
>
> Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3)
> all of their downstream operators until (and including) their first common
> ancestor (e.g., join) to be equal to the number of partitions (size of
> p_intersect).
>
> 2. All other cases
>
> In all other cases, the parallelism of both sources and their downstream
> operators until their common ancestor would be equal to the MIN(p_s1,
> p_s2).
> That is, minimum of the partition size of source1 and partition size of
> source2 will be selected as the parallelism.
> Coming back to your example, if source1 parallelism is 127 and source2
> parallelism is 128, then we will first check the partition size of source1
> and source2.
> Say partition size of source1 is 100 and partition size of source2 is 90.
> Then, we would set the parallelism for source1, source2, and all of their
> downstream operators until (and including) the join operator
> to 90 (min(100, 90)).
> We also plan to implement a cost based decision instead of the rule-based
> one (the ones explained above - MIN rule).
> One  possible result of the cost based estimation is to keep the partitions
> on one side and perform the shuffling on another source.
>
>
>
> 2. In our current shuffle remove design (FlinkExpandConversionRule),
> > we don't consider parallelism, we just remove unnecessary shuffles
> > according to the distribution columns. After this FLIP, the
> > parallelism may be bundled with source's partitions, then how will
> > this optimization accommodate with FlinkExpandConversionRule, will you
> > also change downstream operator's parallelisms if we want to also
> > remove subsequent shuffles?
>
>
>
> - From my understanding of FlinkExpandConversionRule, its removal logic is
> agnostic to operator parallelism.
> So, if FlinkExpandConversionRule decides to remove a shuffle operation,
> then this FLIP will search another possible shuffle (the one closest to the
> source) to remove.
> If there is such an opportunity, this FLIP will remove the shuffle. So,
> from my understanding FlinkExpandConversionRule and this optimization rule
> can work together safely.
> Please correct me if I misunderstood your question.
>
>
>
> Regarding the new optimization rule, have you also considered to allow
> > some non-strict mode like FlinkRelDistribution#requireStrict? For
> > example, source is pre-partitioned by a, b columns, if we are
> > consuming this source, and do a aggregate on a, b, c, can we utilize
> > this optimization?
>
>
> - Good point. Yes, there are some cases that non-strict mode will apply.
> For example:
>
> - pre-partitioned columns and aggregate columns are the same but have
> different order (e.g., source pre-partitioned  w.r.t. a,b and aggregate has
> a GROUP BY b,a)
> - columns in the Exchange operator is a list-prefix of pre-partitoned
> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
> Exchange's partition columns are a,b)
>
> Please let me know if the above answers your questions or if you have any
> other comments.
>
> Regards,
> Jeyhun
>
> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> wrote:
>
> > Thanks Jeyhun for bringing up this discussion, it is really exiting,
> > +1 for the general idea.
> >
> > We also introduced a similar concept in Flink Batch internally to cope
> > with bucketed tables in Hive, it is a very important improvement.
> >
> > > One thing to note is that for join queries, the parallelism of each join
> > > source might be different. This might result in
> > > inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
> > > different mappings of partitions to source operators).
> > > Therefore, it is the job of planner to detect this and adjust the
> > > parallelism. With that having in mind,
> > > the rest (how the split assigners perform) is consistent among many
> > > sources.
> >
> > Could you elaborate a little more on this. I added my two cents here
> > about this part:
> > 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> > if we cannot have a good greatest common divisor, like 127 + 128,
> > could we just utilize one side's pre-partitioned attribute, and let
> > another side just do the shuffle?
> > 2. In our current shuffle remove design (FlinkExpandConversionRule),
> > we don't consider parallelism, we just remove unnecessary shuffles
> > according to the distribution columns. After this FLIP, the
> > parallelism may be bundled with source's partitions, then how will
> > this optimization accommodate with FlinkExpandConversionRule, will you
> > also change downstream operator's parallelisms if we want to also
> > remove subsequent shuffles?
> >
> >
> > Regarding the new optimization rule, have you also considered to allow
> > some non-strict mode like FlinkRelDistribution#requireStrict? For
> > example, source is pre-partitioned by a, b columns, if we are
> > consuming this source, and do a aggregate on a, b, c, can we utilize
> > this optimization?
> >
> > Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道:
> > >
> > > Hi Jeyhun,
> > >
> > > Thanks for your clarification.
> > >
> > > > Once a new partition is detected, we add it to our existing mapping.
> > Our
> > > mapping looks like Map<Integer, Set<Integer>>
> > subtaskToPartitionAssignment,
> > > where it maps each source subtaskID to zero or more partitions.
> > >
> > > I understand your point.  **It would be better if you could sync the
> > > content to the FLIP**.
> > >
> > > Another thing is I'm curious about what the physical plan looks like. Is
> > > there any specific info that will be added to the table source (like
> > > filter/project pushdown)? It would be great if you could attach an
> > example
> > > to the FLIP.
> > >
> > > Bests,
> > > Jane
> > >
> > > On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> > >
> > > > Hi Jane,
> > > >
> > > > Thanks for your comments.
> > > >
> > > >
> > > > 1. Concerning the `sourcePartitions()` method, the partition
> > information
> > > > > returned during the optimization phase may not be the same as the
> > > > partition
> > > > > information during runtime execution. For long-running jobs,
> > partitions
> > > > may
> > > > > be continuously created. Is this FLIP equipped to handle scenarios?
> > > >
> > > >
> > > > - Good point. This scenario is definitely supported.
> > > > Once a new partition is added, or in general, new splits are
> > > > discovered,
> > > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
> > > > newSplits)
> > > > method will be called. Inside that method, we are able to detect if a
> > split
> > > > belongs to existing partitions or there is a new partition.
> > > > Once a new partition is detected, we add it to our existing mapping.
> > Our
> > > > mapping looks like Map<Integer, Set<Integer>>
> > subtaskToPartitionAssignment,
> > > > where
> > > > it maps each source subtaskID to zero or more partitions.
> > > >
> > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > > > understand that it is also necessary to verify whether the hash key
> > > > within
> > > > > the Exchange node is consistent with the partition key defined in the
> > > > table
> > > > > source that implements `SupportsPartitioning`.
> > > >
> > > >
> > > > - Yes, I overlooked that point, fixed. Actually, the rule is much
> > > > complicated. I tried to simplify it in the FLIP. Good point.
> > > >
> > > >
> > > > 3. Could you elaborate on the desired physical plan and integration
> > with
> > > > > `CompiledPlan` to enhance the overall functionality?
> > > >
> > > >
> > > > - For compiled plan, PartitioningSpec will be used, with a json tag
> > > > "Partitioning". As a result, in the compiled plan, the source operator
> > will
> > > > have
> > > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled
> > plan.
> > > > More about the implementation details below:
> > > >
> > > > --------------------------------
> > > > PartitioningSpec class
> > > > --------------------------------
> > > > @JsonTypeName("Partitioning")
> > > > public final class PartitioningSpec extends SourceAbilitySpecBase {
> > > >  // some code here
> > > >     @Override
> > > >     public void apply(DynamicTableSource tableSource,
> > SourceAbilityContext
> > > > context) {
> > > >         if (tableSource instanceof SupportsPartitioning) {
> > > >             ((SupportsPartitioning<?>)
> > tableSource).applyPartitionedRead();
> > > >         } else {
> > > >             throw new TableException(
> > > >                     String.format(
> > > >                             "%s does not support
> > SupportsPartitioning.",
> > > >                             tableSource.getClass().getName()));
> > > >         }
> > > >     }
> > > >   // some code here
> > > > }
> > > >
> > > > --------------------------------
> > > > SourceAbilitySpec class
> > > > --------------------------------
> > > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> > > > JsonTypeInfo.As.PROPERTY, property = "type")
> > > > @JsonSubTypes({
> > > >     @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> > > >     @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> > > >     @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> > > >     @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> > > >     @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> > > >     @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> > > >     @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> > > >     @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> > > > +  @JsonSubTypes.Type(value = PartitioningSpec.class)
> >  //
> > > > new added
> > > >
> > > >
> > > >
> > > > Please let me know if that answers your questions or if you have other
> > > > comments.
> > > >
> > > > Regards,
> > > > Jeyhun
> > > >
> > > >
> > > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com>
> > wrote:
> > > >
> > > > > Hi Jeyhun,
> > > > >
> > > > > Thank you for leading the discussion. I'm generally +1 with this
> > > > proposal,
> > > > > along with some questions. Please see my comments below.
> > > > >
> > > > > 1. Concerning the `sourcePartitions()` method, the partition
> > information
> > > > > returned during the optimization phase may not be the same as the
> > > > partition
> > > > > information during runtime execution. For long-running jobs,
> > partitions
> > > > may
> > > > > be continuously created. Is this FLIP equipped to handle scenarios?
> > > > >
> > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > > > understand that it is also necessary to verify whether the hash key
> > > > within
> > > > > the Exchange node is consistent with the partition key defined in the
> > > > table
> > > > > source that implements `SupportsPartitioning`.
> > > > >
> > > > > 3. Could you elaborate on the desired physical plan and integration
> > with
> > > > > `CompiledPlan` to enhance the overall functionality?
> > > > >
> > > > > Best,
> > > > > Jane
> > > > >
> > > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes
> > <jhug...@confluent.io.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jeyhun,
> > > > > >
> > > > > > I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> > > > > > generalize FLIP-434 to be about "pre-divided" data to cover
> > "buckets"
> > > > and
> > > > > > "partitions" (and maybe even situations where a data source is
> > > > > partitioned
> > > > > > and bucketed).
> > > > > >
> > > > > > Separate from that, the page mentions TPC-H Q1 as an example.  For
> > a
> > > > > join,
> > > > > > any two tables joined on the same bucket key should provide a
> > concrete
> > > > > > example of a join.  Systems like Kafka Streams/ksqlDB call this
> > > > > > "co-partitioning"; for those systems, it is a requirement placed
> > on the
> > > > > > input sources.  For Flink, with FLIP-434, the proposed planner rule
> > > > > > could remove the shuffle.
> > > > > >
> > > > > > Definitely a fun idea; I look forward to hearing more!
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Jim
> > > > > >
> > > > > >
> > > > > > 1.
> > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> > > > > > 2.
> > > > > >
> > > > > >
> > > > >
> > > >
> > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
> > > > > >
> > > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <
> > je.kari...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi devs,
> > > > > > >
> > > > > > > I’d like to start a discussion on FLIP-434: Support
> > optimizations for
> > > > > > > pre-partitioned data sources [1].
> > > > > > >
> > > > > > > The FLIP introduces taking advantage of pre-partitioned data
> > sources
> > > > > for
> > > > > > > SQL/Table API (it is already supported as experimental feature in
> > > > > > > DataStream API [2]).
> > > > > > >
> > > > > > >
> > > > > > > Please find more details in the FLIP wiki document [1].
> > > > > > > Looking forward to your feedback.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Jeyhun
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > > > > > > [2]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >



-- 

Best,
Benchao Li

Reply via email to