Hi,

Thanks all for all the valuable discussion on this FLIP, +1 for implementing 
dynamic partition pruning / dynamic filtering pushdown since it is a key 
optimization 
to improve the performance on batch processing.

Also due to introducing the speculative execution for the batch processing, we
might also need some consideration for the case with speculative execution 
enabled:
1. The operator coordinator of DynamicFilteringDataCollector should ignore the 
following
filtering data in consider of the task might executes for multiple attempts.
2. The DynamicFileSplitEnumerator should also implements the 
`SupportsHandleExecutionAttemptSourceEvent`
interface, otherwise it would throws exception when received the filtering data 
source event. 

Best,
Yun Gao



[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job



------------------------------------------------------------------
From:Jing Ge <j...@ververica.com>
Send Time:2022 Jul. 21 (Thu.) 18:56
To:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Hi,

Thanks for the informative discussion! Looking forward to using dynamic
filtering provided by Flink.

Best regards,
Jing

On Tue, Jul 19, 2022 at 3:22 AM godfrey he <godfre...@gmail.com> wrote:

> Hi, Jingong, Jark, Jing,
>
> Thanks for for the important inputs.
> Lake storage is a very important scenario, and consider more generic
> and extended case,
> I also would like to use "dynamic filtering" concept instead of
> "dynamic partition".
>
> >maybe the FLIP should also demonstrate the EXPLAIN result, which
> is also an API.
> I will add a section to describe the EXPLAIN result.
>
> >Does DPP also support streaming queries?
> Yes, but for bounded source.
>
> >it requires the SplitEnumerator must implements new introduced
> `SupportsHandleExecutionAttemptSourceEvent` interface,
> +1
>
> I will update the document and the poc code.
>
> Best,
> Godfrey
>
> Jing Zhang <beyond1...@gmail.com> 于2022年7月13日周三 20:22写道:
> >
> > Hi Godfrey,
> > Thanks for driving this discussion.
> > This is an important improvement for batch sql jobs.
> > I agree with Jingsong to expand the capability to more than just
> partitions.
> > Besides, I have two points:
> > 1. Based on FLIP-248[1],
> >
> > > Dynamic partition pruning mechanism can improve performance by avoiding
> > > reading large amounts of irrelevant data, and it works for both batch
> and
> > > streaming queries.
> >
> > Does DPP also support streaming queries?
> > It seems the proposed changes in the FLIP-248 does not work for streaming
> > queries,
> > because the dimension table might be an unbounded inputs.
> > Or does it require all dimension tables to be bounded inputs for
> streaming
> > jobs if the job wanna enable DPP?
> >
> > 2. I notice there are changes on SplitEnumerator for Hive source and File
> > source.
> > And they now depend on SourceEvent to pass PartitionData.
> > In FLIP-245, if enable speculative execution for sources based on FLIP-27
> > which use SourceEvent,
> > it requires the SplitEnumerator must implements new introduced
> > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > otherwise an exception would be thrown out.
> > Since hive and File sources are commonly used for batch jobs, it's better
> > to take this point into consideration.
> >
> > Best,
> > Jing Zhang
> >
> > [1] FLIP-248:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > [2] FLIP-245:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >
> >
> > Jark Wu <imj...@gmail.com> 于2022年7月12日周二 13:16写道:
> >
> > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> Pushdown
> > > that the join key contains partition fields.  Extending this FLIP to
> > > general filter
> > > pushdown can benefit more optimizations, and they can share the same
> > > interface.
> > >
> > > For example, Trino Hive Connector leverages dynamic filtering to
> support:
> > > - dynamic partition pruning for partitioned tables
> > > - and dynamic bucket pruning for bucket tables
> > > - and dynamic filter pushed into the ORC and Parquet readers to perform
> > > stripe
> > >   or row-group pruning and save on disk I/O.
> > >
> > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> Dynamic
> > > Filtering),
> > > just like Trino [1].  The interfaces should also be adapted for that.
> > >
> > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> which
> > > is also an API.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <jingsongl...@gmail.com>
> wrote:
> > >
> > > > Thanks Godfrey for driving.
> > > >
> > > > I like this FLIP.
> > > >
> > > > We can restrict this capability to more than just partitions.
> > > > Here are some inputs from Lake Storage.
> > > >
> > > > The format of the splits generated by Lake Storage is roughly as
> follows:
> > > > Split {
> > > >    Path filePath;
> > > >    Statistics[] fieldStats;
> > > > }
> > > >
> > > > Stats contain the min and max of each column.
> > > >
> > > > If the storage is sorted by a column, this means that the split
> > > > filtering on that column will be very good, so not only the partition
> > > > field, but also this column is worthy of being pushed down the
> > > > RuntimeFilter.
> > > > This information can only be known by source, so I suggest that
> source
> > > > return which fields are worthy of being pushed down.
> > > >
> > > > My overall point is:
> > > > This FLIP can be extended to support Source Runtime Filter push-down
> > > > for all fields, not just dynamic partition pruning.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <godfre...@gmail.com>
> wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > > > partition pruning.
> > > > >
> > > > >  Currently, Flink supports static partition pruning: the
> conditions in
> > > > > the WHERE clause are analyzed
> > > > > to determine in advance which partitions can be safely skipped in
> the
> > > > > optimization phase.
> > > > > Another common scenario: the partitions information is not
> available
> > > > > in the optimization phase but in the execution phase.
> > > > > That's the problem this FLIP is trying to solve: dynamic partition
> > > > > pruning, which could reduce the partition table source IO.
> > > > >
> > > > > The query pattern looks like:
> > > > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > > > d_date_sk and d_year = 2000
> > > > >
> > > > > We will introduce a mechanism for detecting dynamic partition
> pruning
> > > > > patterns in optimization phase
> > > > > and performing partition pruning at runtime by sending the
> dimension
> > > > > table results to the SplitEnumerator
> > > > > of fact table via existing coordinator mechanism.
> > > > >
> > > > > You can find more details in FLIP-248 document[1].
> > > > > Looking forward to your any feedback.
> > > > >
> > > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > >
> > > > >
> > > > > Best,
> > > > > Godfrey
> > > >
> > >SupportsHandleExecutionAttemptSourceEvent>

Reply via email to