Hi, thanks for your responses.

To Benchao:

Glad to see your works and requirements, they should be Public.

To Kurt:

1.Regarding "SupportsXXX" for ScanTableSource or LookupTableSource
or DynamicTableSink, I don't think a "SupportsXXX" must work with all these
three types. As Godfrey said, Such as a LookupTableSource should not extend
from SupportsWatermarkPushDown and SupportsComputedColumnPushDown. We just
try our best to make all combinations work, like
"SupportsParallelismReport", it can work with both ScanTableSource
and DynamicTableSink.

About adding the method "reportParallelism" we want directly to
ScanTableSource and DynamicTableSink, I think maybe most of sources/sinks
do not want to see this method, provides a "SupportsXXX" aim to give
connector developer a option selection.

2.Regarding SupportsStatisticsReport doesn't work for unbounded streaming
table sources, yes, it is, the statistics (Including catalog statistics)
are not related to stream tables, but I think, in future, we can create
more useful statistics information for streaming tables.

3."oldStats" in SupportsStatisticsReport, "oldStats" should be named to
"catalogStats", source just try its best to get more useful and accurate
statistic information, but just like Godfrey said, it is a supplement to
catalog statistics, it can just supplement missing or inaccurate
information in the catalog.

4.Internal or Public, I am glad to see your requirements, I am OK with
Public.

To Godfrey:

Regarding If there are multiple Transformations in source op, and they
require different parallelism. In this case, it should be left to the
source to set the parallelism. So, these should be two things that are
orthogonal. Users who do not use multi Transformations still need to set
parallelism.

Best,
Jingsong

On Thu, Jul 30, 2020 at 8:31 PM godfrey he <godfre...@gmail.com> wrote:

> Thanks Jingsong for bringing up this discussion,
>  and thanks Kurt for the detailed thoughts.
>
> First of all, I also think it's a very useful feature to expose more
> ability for table source.
>
> 1) If we want to support [1], it's seem that SupportsParallelismReport
> does not meet the requirement: If there are multiple Transformations in
> source op,
> and they require different parallelism.
>
> 2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
> Currently, we also do not distinguish them for the existing "SupportsXXX".
> Such as a LookupTableSource should not extend from
> SupportsWatermarkPushDown
> and SupportsComputedColumnPushDown.
> A DynamicTableSource sub-class will extend from "SupportsXXX" only if it
> has the capability,
> So the unbounded table source should not extend from
> SupportsStatisticsReport,
> or just return unknown for unbounded if a table source can work for both
> bounded and unbounded.
>
> I think SupportsStatisticsReport is a supplement to catalog statistics,
> that means
> only catalog statistic is unknown, SupportsStatisticsReport works.
>
> 3)  +1 to make them as public.
>
> [1] https://issues.apache.org/jira/browse/FLINK-18674
>
> Best,
> Godfrey
>
>
>
> Kurt Young <ykt...@gmail.com> 于2020年7月30日周四 下午4:01写道:
>
> > Hi Jingsong,
> >
> > Thanks for bringing up this discussion. In general, I'm +1 to enrich the
> > source ability by
> > the parallelism and stats reporting, but I'm not sure whether introducing
> > such "SupportsXXXX"
> > interface is a good idea. I will share my thoughts separately.
> >
> > 1) Regarding the interface SupportsParallelismReport, first of all, my
> > feeling is that such a mechanism
> > is not like other abilities like SupportsProjectionPushDown. Parallelism
> of
> > source operator would be
> > decided anyway, the only difference here is whether it's decided purely
> by
> > framework or by table source
> > itself. So another angle to understand this issue is, we can always
> assume
> > a table source has the
> > ability to determine the parallelism. The table source can choose to set
> > the parallelism by itself, or delegate
> > it to the framework.
> >
> > This might sound like personal taste, but there is another bad case if we
> > introduce the interface. You
> > may already know we currently have two major table
> > sources, LookupTableSource and ScanTableSource.
> > IIUC it won't make much sense if the user provides a LookupTableSource
> and
> > also implements
> > SupportsParallelismReport.
> >
> > An alternative solution would be add the method you want directly
> > to ScanTableSource, and also have
> > a default implementation returning -1, which means letting framework to
> > decide the parallelism.
> >
> > 2) Regarding the interface SupportsStatisticsReport, it seems this
> > interface doesn't work for unbounded
> > streaming table sources. What kind of implementation do you expect in
> such
> > a case? And how does this
> > interface work with LookupTableSource?
> > Another question is what the oldStats parameter is used for?
> >
> > 3) Internal or Public. I don't think we should mark them as internal.
> They
> > are currently only used by internal
> > connectors doesn't mean this interface should be internal. I can imagine
> > there will be lots of Filesystem like
> > connectors outside the project which need such capability.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <libenc...@apache.org> wrote:
> >
> > > Hi Jingsong,
> > >
> > > Regarding SupportsParallelismReport,
> > > I think the streaming connectors can also benefit from it.
> > > I see some requirements from user ML that they want to control
> > > source/sink's parallelism instead
> > > to set them to global parallelism.
> > > Also, in our compony, we did this too.
> > >
> > > Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午11:16写道:
> > >
> > > > Hi all,
> > > >
> > > > ## SupportsParallelismReport
> > > >
> > > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still
> using
> > > the
> > > > old interfaces.
> > > >
> > > > We are considering migrating to the new interface.
> > > >
> > > > However, one problem is that in the old interface implementation,
> > > > connectors infer parallelism by itself instead of a global
> parallelism
> > > > configuration. Hive & filesystem determines the parallelism size
> > > according
> > > > to the number of files and the size of the file. In this way, large
> > > tables
> > > > may use thousands of parallelisms, while small tables only have 10
> > > > parallelisms, which can minimize the consumption of task scheduling.
> > > >
> > > > This situation is very common in batch computing. For example, in the
> > > star
> > > > model, a large table needs to be joined with multiple small tables.
> > > >
> > > > So we should give this ability to new table source interfaces. The
> > > > interface can be:
> > > >
> > > > /**
> > > >  * Enables to give source the ability to report parallelism.
> > > >  *
> > > >  * <p>After filtering push down and partition push down, the source
> > > > can have more information,
> > > >  * which can help it infer more effective parallelism.
> > > >  */
> > > > @Internal
> > > > public interface SupportsParallelismReport {
> > > >
> > > >    /**
> > > >     * Report parallelism from source or sink. The parallelism of an
> > > > operator must be at least 1,
> > > >     * or -1 (use system default).
> > > >     */
> > > >    int reportParallelism();
> > > > }
> > > >
> > > >
> > > > Rejected Alternatives:
> > > > - SupportsSplitReport: What is the relationship between this split
> and
> > > the
> > > > split of FLIP-27? Do we have to match them one by one? I think they
> are
> > > two
> > > > independent things. In fact, the design of FLIP-27, split and
> > parallelism
> > > > are not bound one by one.
> > > > - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> > > > partition is a special concept of table. It should not be mixed with
> > > > parallelism.
> > > >
> > > > ## SupportsStatisticsReport
> > > >
> > > > As with parallelism, statistics information from source will be more
> > > > appropriate and accurate. After filtering push down and partition
> push
> > > > down, the source can have more information, which can help it infer
> > more
> > > > effective statistics. However, if we only infer from the planner
> > itself,
> > > it
> > > > may lead to a big gap between the statistics information and the real
> > > > situation.
> > > >
> > > > The interface:
> > > >
> > > > /**
> > > >  * Enables to give {@link ScanTableSource} the ability to report
> table
> > > > statistics.
> > > >  *
> > > >  * <p>Statistics can be inferred from real data in real time,  it is
> > > > more accurate than the
> > > >  * statistics in the catalog.
> > > >  *
> > > >  * <p>After filtering push down and partition push down, the source
> > > > can have more information,
> > > >  * which can help it infer more effective table statistics.
> > > >  */
> > > > @Internal
> > > > public interface SupportsStatisticsReport {
> > > >
> > > >    /**
> > > >     * Reports {@link TableStats} from old table stats.
> > > >     */
> > > >    TableStats reportTableStatistics(TableStats oldStats);
> > > > }
> > > >
> > > >
> > > > When to invoke reported statistics to the planner?
> > > > - First of all, this call can be expensive (to view the metadata of
> the
> > > > files), so it can't be called repeatedly.
> > > > - We need to call after FilterPushdown, because that's the most
> > accurate
> > > > information. We also need to call before CBO (Like JoinReorder and
> > choose
> > > > BroadcastJoin or ShuffleJoin), because that's where statistics are
> > used.
> > > >
> > > > Rejected Alternatives:
> > > > - Using CatalogTableStatistics: CatalogTableStatistics or
> TableStats? I
> > > > lean to TableStats, because TableStats is the class used by planner,
> > > > but CatalogTableStatistics may contains some catalog information
> which
> > is
> > > > not related to planner optimizer.
> > > >
> > > > ## Internal or Public
> > > >
> > > > I personally lean to internal, these interfaces are only used Hive
> and
> > > > Filesystem, another way is: SupportsParallelismReport(Internal, I
> > haven't
> > > > seen this requirement from outside.) and
> > SupportsStatisticsReport(Public,
> > > > maybe Apache Iceberg Flink connector can use it).
> > > >
> > > > What do you think?
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>


-- 
Best, Jingsong Lee

Reply via email to