+1, thanks for the efforts.

On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi all,
>
> As Jark suggested in VOTE thread.
> JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > Hi Timo,
> >
> > G ood catch!
> >
> > I really love the idea 2, a full Flink config looks very good to me.
> >
> > Try to understand your first one, actually we don't have
> `TableIdentifier`
> > class now. But TableFactory already indicate table. So I am OK.
> >
> > New Context should be:
> >
> >    /**
> >     * Context of table source creation. Contains table information and
> environment information.
> >     */
> >    interface Context {
> >       /**
> >        * @return full identifier of the given {@link CatalogTable}.
> >        */
> >       ObjectIdentifier getObjectIdentifier();
> >       /**
> >        * @return table {@link CatalogTable} instance.
> >        */
> >       CatalogTable getTable();
> >       /**
> >        * @return readable config of this table environment.
> >        */
> >       ReadableConfig getConfiguration();
> >    }
> >
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> wrote:
> >
> >> Hi Jingsong,
> >>
> >> some last minute changes from my side:
> >>
> >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
> >> obvious. Otherwise people expect a `TableIdentifier` class being
> >> returned here.
> >>
> >> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> >> will not only be a "table" config but might give access to the full
> >> Flink config
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >> On 04.02.20 06:27, Jingsong Li wrote:
> >> > So the interface will be:
> >> >
> >> > public interface TableSourceFactory<T> extends TableFactory {
> >> >     ......
> >> >
> >> >     /**
> >> >      * Creates and configures a {@link TableSource} based on the given
> >> > {@link Context}.
> >> >      *
> >> >      * @param context context of this table source.
> >> >      * @return the configured table source.
> >> >      */
> >> >     default TableSource<T> createTableSource(Context context) {
> >> >        ObjectIdentifier tableIdentifier =
> context.getTableIdentifier();
> >> >        return createTableSource(
> >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> > tableIdentifier.getObjectName()),
> >> >              context.getTable());
> >> >     }
> >> >     /**
> >> >      * Context of table source creation. Contains table information
> and
> >> > environment information.
> >> >      */
> >> >     interface Context {
> >> >        /**
> >> >         * @return full identifier of the given {@link CatalogTable}.
> >> >         */
> >> >        ObjectIdentifier getTableIdentifier();
> >> >        /**
> >> >         * @return table {@link CatalogTable} instance.
> >> >         */
> >> >        CatalogTable getTable();
> >> >        /**
> >> >         * @return readable config of this table environment.
> >> >         */
> >> >        ReadableConfig getTableConfig();
> >> >     }
> >> > }
> >> >
> >> > public interface TableSinkFactory<T> extends TableFactory {
> >> >     ......
> >> >     /**
> >> >      * Creates and configures a {@link TableSink} based on the given
> >> > {@link Context}.
> >> >      *
> >> >      * @param context context of this table sink.
> >> >      * @return the configured table sink.
> >> >      */
> >> >     default TableSink<T> createTableSink(Context context) {
> >> >        ObjectIdentifier tableIdentifier =
> context.getTableIdentifier();
> >> >        return createTableSink(
> >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> > tableIdentifier.getObjectName()),
> >> >              context.getTable());
> >> >     }
> >> >     /**
> >> >      * Context of table sink creation. Contains table information and
> >> > environment information.
> >> >      */
> >> >     interface Context {
> >> >        /**
> >> >         * @return full identifier of the given {@link CatalogTable}.
> >> >         */
> >> >        ObjectIdentifier getTableIdentifier();
> >> >        /**
> >> >         * @return table {@link CatalogTable} instance.
> >> >         */
> >> >        CatalogTable getTable();
> >> >        /**
> >> >         * @return readable config of this table environment.
> >> >         */
> >> >        ReadableConfig getTableConfig();
> >> >     }
> >> > }
> >> >
> >> >
> >> > Best,
> >> > Jingsong Lee
> >> >
> >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com>
> >> wrote:
> >> >
> >> >> Hi all,
> >> >>
> >> >> After rethinking and discussion with Kurt, I'd like to remove
> >> "isBounded".
> >> >> We can delay this is bounded message to TableSink.
> >> >> With TableSink refactor, we need consider "consumeDataStream"
> >> >> and "consumeBoundedStream".
> >> >>
> >> >> Best,
> >> >> Jingsong Lee
> >> >>
> >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Hi Jark,
> >> >>>
> >> >>> Thanks involving, yes, it's hard to understand to add isBounded on
> the
> >> >>> source.
> >> >>> I recommend adding only to sink at present, because sink has
> upstream.
> >> >>> Its upstream is either bounded or unbounded.
> >> >>>
> >> >>> Hi all,
> >> >>>
> >> >>> Let me summarize with your suggestions.
> >> >>>
> >> >>> public interface TableSourceFactory<T> extends TableFactory {
> >> >>>
> >> >>>     ......
> >> >>>
> >> >>>
> >> >>>     /**
> >> >>>      * Creates and configures a {@link TableSource} based on the
> >> given {@link Context}.
> >> >>>      *
> >> >>>      * @param context context of this table source.
> >> >>>      * @return the configured table source.
> >> >>>      */
> >> >>>     default TableSource<T> createTableSource(Context context) {
> >> >>>        ObjectIdentifier tableIdentifier =
> >> context.getTableIdentifier();
> >> >>>        return createTableSource(
> >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> tableIdentifier.getObjectName()),
> >> >>>              context.getTable());
> >> >>>     }
> >> >>>
> >> >>>     /**
> >> >>>      * Context of table source creation. Contains table information
> >> and environment information.
> >> >>>      */
> >> >>>     interface Context {
> >> >>>
> >> >>>        /**
> >> >>>         * @return full identifier of the given {@link CatalogTable}.
> >> >>>         */
> >> >>>        ObjectIdentifier getTableIdentifier();
> >> >>>
> >> >>>        /**
> >> >>>         * @return table {@link CatalogTable} instance.
> >> >>>         */
> >> >>>        CatalogTable getTable();
> >> >>>
> >> >>>        /**
> >> >>>         * @return readable config of this table environment.
> >> >>>         */
> >> >>>        ReadableConfig getTableConfig();
> >> >>>     }
> >> >>> }
> >> >>>
> >> >>> public interface TableSinkFactory<T> extends TableFactory {
> >> >>>
> >> >>>     ......
> >> >>>
> >> >>>     /**
> >> >>>      * Creates and configures a {@link TableSink} based on the given
> >> {@link Context}.
> >> >>>      *
> >> >>>      * @param context context of this table sink.
> >> >>>      * @return the configured table sink.
> >> >>>      */
> >> >>>     default TableSink<T> createTableSink(Context context) {
> >> >>>        ObjectIdentifier tableIdentifier =
> >> context.getTableIdentifier();
> >> >>>        return createTableSink(
> >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> tableIdentifier.getObjectName()),
> >> >>>              context.getTable());
> >> >>>     }
> >> >>>
> >> >>>     /**
> >> >>>      * Context of table sink creation. Contains table information
> and
> >> environment information.
> >> >>>      */
> >> >>>     interface Context {
> >> >>>
> >> >>>        /**
> >> >>>         * @return full identifier of the given {@link CatalogTable}.
> >> >>>         */
> >> >>>        ObjectIdentifier getTableIdentifier();
> >> >>>
> >> >>>        /**
> >> >>>         * @return table {@link CatalogTable} instance.
> >> >>>         */
> >> >>>        CatalogTable getTable();
> >> >>>
> >> >>>        /**
> >> >>>         * @return readable config of this table environment.
> >> >>>         */
> >> >>>        ReadableConfig getTableConfig();
> >> >>>
> >> >>>        /**
> >> >>>         * @return Input whether or not it is bounded.
> >> >>>         */
> >> >>>        boolean isBounded();
> >> >>>     }
> >> >>> }
> >> >>>
> >> >>> If there is no objection, I will start a vote thread. (if
> necessary, I
> >> >>> can also edit a FLIP).
> >> >>>
> >> >>> Best,
> >> >>> Jingsong Lee
> >> >>>
> >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com
> >
> >> >>> wrote:
> >> >>>
> >> >>>> Thanks Bowen and Timo for involving.
> >> >>>>
> >> >>>> Hi Bowen,
> >> >>>>
> >> >>>>> 1. is it better to have explicit APIs like
> >> >>>> "createBatchTableSource(...)"
> >> >>>> I think it is better to keep one method, since in [1], we have
> >> reached
> >> >>>> one in DataStream layer to maintain a single API in "env.source". I
> >> think
> >> >>>> it is good to not split batch and stream, And our
> >> TableSource/TableSink are
> >> >>>> the same class for both batch and streaming too.
> >> >>>>
> >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> class.
> >> >>>> As Timo said, We may have more parameters to add in the future,
> take
> >> a
> >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by
> >> little.
> >> >>>>
> >> >>>> Hi Timo,
> >> >>>>
> >> >>>> Your suggestion about Context looks good to me.
> >> >>>> "TablePath" used in Hive for updating the catalog information of
> this
> >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> >> >>>>
> >> >>>>> Can we postpone the change of TableValidators?
> >> >>>> Yes, ConfigOption validation looks good to me. It seems that you
> have
> >> >>>> been thinking about this for a long time. It's very good. Looking
> >> forward
> >> >>>> to the promotion of FLIP-54.
> >> >>>>
> >> >>>> [1]
> >> >>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> >> >>>>
> >> >>>> Best,
> >> >>>> Jingsong Lee
> >> >>>>
> >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org>
> >> wrote:
> >> >>>>
> >> >>>>> Hi Jingsong,
> >> >>>>>
> >> >>>>> +1 for adding a context in the source and sink factories. A
> context
> >> >>>>> class also allows for future modifications without touching the
> >> >>>>> TableFactory interface again.
> >> >>>>>
> >> >>>>> How about:
> >> >>>>>
> >> >>>>> interface TableSourceFactory {
> >> >>>>>       interface Context {
> >> >>>>>          // ...
> >> >>>>>       }
> >> >>>>> }
> >> >>>>>
> >> >>>>> Because I find the name `CatalogTableContext` confusing and we can
> >> >>>>> bound
> >> >>>>> the interface to the factory class itself as an inner interface.
> >> >>>>>
> >> >>>>> Readable access to configuration sounds also right to me. Can we
> >> remove
> >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
> >> >>>>> factory should know the path. And if so, it should be an
> >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
> >> using.
> >> >>>>>
> >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because
> >> we
> >> >>>>> would like to use terminology around boundedness rather than
> >> >>>>> streaming/batch.
> >> >>>>>
> >> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> >> explicitly
> >> >>>>> avoid specialized methods in the future.
> >> >>>>>
> >> >>>>> Can we postpone the change of TableValidators? I don't think that
> >> every
> >> >>>>> factory needs a schema validator. Ideally, the factory should just
> >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
> >> >>>>> validation logic as mentioned in the validation part of
> FLIP-54[1].
> >> But
> >> >>>>> currently our config options are not rich enough to have a unified
> >> >>>>> validation. Additionally, the factory should return some
> properties
> >> >>>>> such
> >> >>>>> as "supports event-time" for the schema validation outside of the
> >> >>>>> factory itself.
> >> >>>>>
> >> >>>>> Regards,
> >> >>>>> Timo
> >> >>>>>
> >> >>>>> [1]
> >> >>>>>
> >> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> >> >>>>>> Hi Jingsong,
> >> >>>>>>
> >> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm
> >> more
> >> >>>>>> familiar with them. I agree these are shortcomings of the current
> >> >>>>> Flink SQL
> >> >>>>>> design.
> >> >>>>>>
> >> >>>>>> A couple comments on your 1st proposal:
> >> >>>>>>
> >> >>>>>> 1. is it better to have explicit APIs like
> >> >>>>> "createBatchTableSource(...)"
> >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory
> (would
> >> be
> >> >>>>>> similar for sink factory) to let planner handle which mode
> >> (streaming
> >> >>>>> vs
> >> >>>>>> batch) of source should be instantiated? That way we don't need
> to
> >> >>>>> always
> >> >>>>>> let connector developers handling an if-else on isStreamingMode.
> >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> >> class.
> >> >>>>> The
> >> >>>>>> path, table, and config are fairly independent of each other. So
> >> why
> >> >>>>> not
> >> >>>>>> pass the config in as 3rd parameter as
> `createXxxTableSource(path,
> >> >>>>>> catalogTable, tableConfig)?
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
> >> jingsongl...@gmail.com>
> >> >>>>> wrote:
> >> >>>>>>
> >> >>>>>>> Hi dev,
> >> >>>>>>>
> >> >>>>>>> I'd like to kick off a discussion on the improvement of
> >> >>>>> TableSourceFactory
> >> >>>>>>> and TableSinkFactory.
> >> >>>>>>>
> >> >>>>>>> Motivation:
> >> >>>>>>> Now the main needs and problems are:
> >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
> >> >>>>> need to be
> >> >>>>>>> controlled by the user's table configuration. In the era of
> >> catalog,
> >> >>>>> we
> >> >>>>>>> can't put these config in connector properties, which is too
> >> >>>>> inconvenient.
> >> >>>>>>> 2.Connector can't know if this is batch or stream execution
> mode.
> >> >>>>> But the
> >> >>>>>>> sink implementation of batch and stream is totally different. I
> >> >>>>> understand
> >> >>>>>>> there is an update mode property now, but it splits the batch
> and
> >> >>>>> stream in
> >> >>>>>>> the catalog dimension. In fact, this information can be obtained
> >> >>>>> through
> >> >>>>>>> the current TableEnvironment.
> >> >>>>>>> 3.No interface to call validation. Now our validation is more
> util
> >> >>>>> classes.
> >> >>>>>>> It depends on whether or not the connector calls. Now we have
> some
> >> >>>>> new
> >> >>>>>>> validations to add, such as [2], which is really confuse uses,
> >> even
> >> >>>>>>> developers. Another problem is that our SQL update (DDL) does
> not
> >> >>>>> have
> >> >>>>>>> validation [3]. It is better to report an error when executing
> >> DDL,
> >> >>>>>>> otherwise it will confuse the user.
> >> >>>>>>>
> >> >>>>>>> Proposed change draft for 1 and 2:
> >> >>>>>>>
> >> >>>>>>> interface CatalogTableContext {
> >> >>>>>>>      ObjectPath getTablePath();
> >> >>>>>>>      CatalogTable getTable();
> >> >>>>>>>      ReadableConfig getTableConfig();
> >> >>>>>>>      boolean isStreamingMode();
> >> >>>>>>> }
> >> >>>>>>>
> >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
> >> >>>>>>>
> >> >>>>>>>      default TableSource<T>
> createTableSource(CatalogTableContext
> >> >>>>> context) {
> >> >>>>>>>         return createTableSource(context.getTablePath(),
> >> >>>>> context.getTable());
> >> >>>>>>>      }
> >> >>>>>>>
> >> >>>>>>>      ......
> >> >>>>>>> }
> >> >>>>>>>
> >> >>>>>>> Proposed change draft for 3:
> >> >>>>>>>
> >> >>>>>>> public interface TableFactory {
> >> >>>>>>>
> >> >>>>>>>      TableValidators validators();
> >> >>>>>>>
> >> >>>>>>>      interface TableValidators {
> >> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> >> >>>>>>>         TableSchemaValidator schemaValidator();
> >> >>>>>>>         FormatDescriptorValidator formatValidator();
> >> >>>>>>>      }
> >> >>>>>>> }
> >> >>>>>>>
> >> >>>>>>> What do you think?
> >> >>>>>>>
> >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> >> >>>>>>> [2]
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> >> >>>>>>>
> >> >>>>>>> Best,
> >> >>>>>>> Jingsong Lee
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>> --
> >> >>>> Best, Jingsong Lee
> >> >>>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Best, Jingsong Lee
> >> >>>
> >> >>
> >> >>
> >> >> --
> >> >> Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best regards!
Rui Li

Reply via email to