Hi, godfrey and Jark, thanks for joining the discussion.

The implications for FileSource


Sorry about that, I have missed an important feature that the FileSource

supports continuous reading. I think we can do the same thing for the
FileSource,

which is using the partition pruning function to filter unneeded
partitions.

The function will be held by  `FileSystemTableSource`,
`FileSource`,`AbstractFileSource` and `ContinuousFileSplitEnumerator`. At
last, it will be used in
`ContinuousFileSplitEnumerator#processDiscoveredSplits`, the splits
belonging to

unneeded partitions will be dropped here, you can refer to [1].

I think the pruning function will only affect the streaming reading, the
batch

reading will stay the same.


 About `FilterFunction<CatalogPartitionSpec> partitionFilter`.


I agree that FilterFunction is enough and the `CatalogPartitionSpec` I

s considered as the input type that will is more meaningful and suitable
than `RowData`,

and I propose we could combine `PartitionSpecToRowData` with `FilterLogic`
in the same function.

I think it is better that we can pass the `FilterFunction` to the connector
sides by invoking `applyPartitionPuringFunction`, and I try to complete a
simple POC[1]. Unfortunately,

I met a blocker about class loading, the code-generated function can not be
passed

from the client to JobMaster, the exception is shown in the UT[2]. Am I
missing something important?


As far as I think, we could not support passing the initialized code
generated function

from client to JobManager or TaskManager, and the
`GeneratedFunction<FilterFunction>`

is the right choice. But the relevant class about the generated code is
defined

in flink-table-runtime, `SupportsPartitionPushDown` is defined in
flink-table-common,

and the `GeneratedFunction` could be seen as the input type in `
applyPartitionPuringFunction`.



About `applyPartitionPuringFunction` method affects batch/bounded table
> sources


As the code shown, the partition pruning function will only be used in
streaming mode,

and won‘t be called in batch mode. Although we can unify in batch and
streaming mode,

i think it will bring some drawbacks which affect Statics, parallelism
infer.



[1]
https://github.com/apache/flink/compare/master...zoucao:flink:dynamic-partition-pruning

[2]
https://github.com/zoucao/flink/blob/c37d984169c4d099d3ca0458414175168c6e98af/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java#L178

Best regards,

zoucao


Jark Wu <imj...@gmail.com> 于2022年7月4日周一 20:07写道:

> Hi zoucao,
>
> Regarding the pruning function, maybe a simple filter function is enough,
>  e.g. `FilterFunction<CatalogPartitionSpec> partitionFilter`.
>
> Besides, it would be better to state clearly how the new
> `applyPartitionPuringFunction`
> method affects batch/bounded table sources. From my understanding,
> this method won't be called in batch mode?
>
> Best,
> Jark
>
> On Mon, 4 Jul 2022 at 19:40, Martijn Visser <martijnvis...@apache.org>
> wrote:
>
> > Hi zoucao,
> >
> > The FileSource does support streaming reading [1].
> >
> > Best regards,
> >
> > Martijn
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html
> >
> > Op ma 4 jul. 2022 om 05:58 schreef godfrey he <godfre...@gmail.com>:
> >
> > > Hi zoucao,
> > >
> > > Look forward your FLIP.
> > >
> > > >For Batch reading, the 'remainingPartitions' will be seen as the
> > > partitions
> > > >needed to consume, for streaming reading, we use the
> > > >'partitionPruningFunction' to ignore the unneeded partitions.
> > > There should be for bounded source(maybe batch or streaming),
> > > `applyPartitions` should be used,
> > > while only for unbounded source, `applyPartitionPuringFunction` can be
> > > used.
> > >
> > > Best,
> > > Godfrey
> > >
> > > cao zou <zoucao...@gmail.com> 于2022年7月4日周一 11:04写道:
> > > >
> > > > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> > > could
> > > > you help give me the permission?
> > > > My Id is zoucao, and my mail is zoucao...@gmail.com.
> > > >
> > > > The implications for FileSource
> > > >
> > > > In the above discussion, only HiveSource has been involved, because
> it
> > > > holds a continuous partition fetcher, but FileSource not. If we do
> the
> > > > streaming pruning only in the partition fetcher, it will not affect
> the
> > > > FileSource. If the FileSource supports streaming reading in the
> future,
> > > the
> > > > same changes can be applied to it.
> > > >
> > > > Best regards,
> > > > zoucao
> > > >
> > > > Martijn Visser <martijnvis...@apache.org> 于2022年7月1日周五 16:20写道:
> > > >
> > > > > Hi zoucao,
> > > > >
> > > > > I think this topic deserves a proper FLIP and a vote. This approach
> > is
> > > > > focussed only on Hive, but I would also like to understand the
> > > implications
> > > > > for FileSource. Can you create one?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>:
> > > > >
> > > > > > Hi devs, I want to start a discussion to find a way to support
> > > partition
> > > > > > pruning for streaming reading.
> > > > > >
> > > > > >
> > > > > > Now, Flink has supported the partition pruning, the
> implementation
> > > > > consists
> > > > > > of *Source Ability*, *Logical Rule*, and the interface
> > > > > > *SupportsPartitionPushDown*, but they all only take effect in
> batch
> > > > > > reading. When reading a table in streaming mode, the existing
> > > mechanism
> > > > > > will cause some problems posted by FLINK-27898
> > > > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the
> > > records
> > > > > > that should be filtered will be sent downstream.
> > > > > >
> > > > > > To solve this drawback, this discussion is proposed, and the Hive
> > and
> > > > > other
> > > > > > BigData systems stored with partitions will benefit more from it.
> > > > > >
> > > > > >  Now, the existing partitions which are needed to consume will be
> > > > > generated
> > > > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions
> > will
> > > be
> > > > > > pushed into TableSource. It’s working well in batch mode, but if
> we
> > > want
> > > > > to
> > > > > > read records from Hive in streaming mode, and consider the
> > partitions
> > > > > > committed in the future, it’s not enough.
> > > > > >
> > > > > > To support pruning the partitions committed in the feature, the
> > > pruning
> > > > > > function should be pushed into the TableSource, and then
> delivered
> > to
> > > > > > *ContinuousPartitionFetcher*, such that the pruning for
> uncommitted
> > > > > > partitions can be invoked here.
> > > > > >
> > > > > > Before proposing the changes, I think it is necessary to clarify
> > the
> > > > > > existing pruning logic. The main logic of the pruning in
> > > > > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > > > > >
> > > > > > Firstly, generating a pruning function called partitionPruner,
> the
> > > > > function
> > > > > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > > > > >
> > > > > >
> > > > > > if tableSource.listPartitions() is not empty:
> > > > > >   partitions = dynamicTableSource.listPartitions()
> > > > > >
> > > > > >   for p in partitions:
> > > > > >     boolean predicate =
> > partitionPruner.map(convertPartitionToRow(p))
> > > > > >
> > > > > >     add p to partitionsAfterPruning where the predicate is true.
> > > > > >
> > > > > > else  tableSource.listPartitions() is empty:
> > > > > >   if the filter can be converted to ResolvedExpression &&
> > > > > >     the catalog can support the filter :
> > > > > >
> > > > > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > > > > >
> > > > > >     the value of partitionsAfterPruning is all needed.
> > > > > >   else :
> > > > > >
> > > > > >     partitions = catalog.listPartitions()
> > > > > >     for p in partitions:
> > > > > >     boolean predicate =
> > partitionPruner.map(convertPartitionToRow(p))
> > > > > >
> > > > > >      add p to partitionsAfterPruning where the predicate is true.
> > > > > >
> > > > > > I think the main logic can be classified into two sides, one
> exists
> > > in
> > > > > the
> > > > > > logical rule, and the other exists in the connector side. The
> > catalog
> > > > > info
> > > > > > should be used on the rule side, and not on the connector side,
> the
> > > > > pruning
> > > > > > function could be used on both of them or unified on the
> connector
> > > side.
> > > > > >
> > > > > >
> > > > > > Proposed changes
> > > > > >
> > > > > >
> > > > > >    - add a new method in SupportsPartitionPushDown
> > > > > >    - let HiveSourceTable, HiveSourceBuilder, and
> > > > > >    HiveContinuousPartitionFetcher hold the pruning function.
> > > > > >    - pruning after fetchPartitions invoked.
> > > > > >
> > > > > > Considering the version compatibility and the optimization for
> the
> > > method
> > > > > > of listing partitions with filter in the catalog, I think we can
> > add
> > > a
> > > > > new
> > > > > > method in *SupportsPartitionPushDown*
> > > > > >
> > > > > > /**
> > > > > > * Provides a list of remaining partitions. After those partitions
> > are
> > > > > > applied, a source must
> > > > > > * not read the data of other partitions during runtime.
> > > > > > *
> > > > > > * <p>See the documentation of {@link SupportsPartitionPushDown}
> for
> > > more
> > > > > > information.
> > > > > > */
> > > > > > void applyPartitions(List<Map<String, String>>
> > remainingPartitions);
> > > > > >
> > > > > > /**
> > > > > > * Provides a pruning function for uncommitted partitions.
> > > > > > */
> > > > > > default void applyPartitionPuringFunction(MapFunction<RowData,
> > > Boolean>
> > > > > > partitionPruningFunction) { }
> > > > > >
> > > > > > We can push the generated function into TableSource, such that
> the
> > > > > > ContinuousPartitionFetcher can get it.
> > > > > >
> > > > > > For Batch reading, the 'remainingPartitions' will be seen as the
> > > > > partitions
> > > > > > needed to consume, for streaming reading, we use the
> > > > > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > > > > Rejected Alternatives
> > > > > >
> > > > > > Do not remove the filter logic in Filter Node about the partition
> > > keys,
> > > > > if
> > > > > > the source will execute streaming reading.
> > > > > >
> > > > > >
> > > > > > Looking forward to your opinions.
> > > > > >
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > > > > >
> > > > > > best
> > > > > >
> > > > > > zoucao
> > > > > >
> > > > >
> > >
> >
>

Reply via email to