Hi all,

As Jark suggested in VOTE thread.
JIRA created: https://issues.apache.org/jira/browse/FLINK-15912

Best,
Jingsong Lee

On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Timo,
>
> G ood catch!
>
> I really love the idea 2, a full Flink config looks very good to me.
>
> Try to understand your first one, actually we don't have `TableIdentifier`
> class now. But TableFactory already indicate table. So I am OK.
>
> New Context should be:
>
>    /**
>     * Context of table source creation. Contains table information and 
> environment information.
>     */
>    interface Context {
>       /**
>        * @return full identifier of the given {@link CatalogTable}.
>        */
>       ObjectIdentifier getObjectIdentifier();
>       /**
>        * @return table {@link CatalogTable} instance.
>        */
>       CatalogTable getTable();
>       /**
>        * @return readable config of this table environment.
>        */
>       ReadableConfig getConfiguration();
>    }
>
>
> Best,
> Jingsong Lee
>
> On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Jingsong,
>>
>> some last minute changes from my side:
>>
>> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
>> obvious. Otherwise people expect a `TableIdentifier` class being
>> returned here.
>>
>> 2. rename `getTableConfig` to `getConfiguration()` in the future this
>> will not only be a "table" config but might give access to the full
>> Flink config
>>
>> Thanks,
>> Timo
>>
>>
>> On 04.02.20 06:27, Jingsong Li wrote:
>> > So the interface will be:
>> >
>> > public interface TableSourceFactory<T> extends TableFactory {
>> >     ......
>> >
>> >     /**
>> >      * Creates and configures a {@link TableSource} based on the given
>> > {@link Context}.
>> >      *
>> >      * @param context context of this table source.
>> >      * @return the configured table source.
>> >      */
>> >     default TableSource<T> createTableSource(Context context) {
>> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>> >        return createTableSource(
>> >              new ObjectPath(tableIdentifier.getDatabaseName(),
>> > tableIdentifier.getObjectName()),
>> >              context.getTable());
>> >     }
>> >     /**
>> >      * Context of table source creation. Contains table information and
>> > environment information.
>> >      */
>> >     interface Context {
>> >        /**
>> >         * @return full identifier of the given {@link CatalogTable}.
>> >         */
>> >        ObjectIdentifier getTableIdentifier();
>> >        /**
>> >         * @return table {@link CatalogTable} instance.
>> >         */
>> >        CatalogTable getTable();
>> >        /**
>> >         * @return readable config of this table environment.
>> >         */
>> >        ReadableConfig getTableConfig();
>> >     }
>> > }
>> >
>> > public interface TableSinkFactory<T> extends TableFactory {
>> >     ......
>> >     /**
>> >      * Creates and configures a {@link TableSink} based on the given
>> > {@link Context}.
>> >      *
>> >      * @param context context of this table sink.
>> >      * @return the configured table sink.
>> >      */
>> >     default TableSink<T> createTableSink(Context context) {
>> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>> >        return createTableSink(
>> >              new ObjectPath(tableIdentifier.getDatabaseName(),
>> > tableIdentifier.getObjectName()),
>> >              context.getTable());
>> >     }
>> >     /**
>> >      * Context of table sink creation. Contains table information and
>> > environment information.
>> >      */
>> >     interface Context {
>> >        /**
>> >         * @return full identifier of the given {@link CatalogTable}.
>> >         */
>> >        ObjectIdentifier getTableIdentifier();
>> >        /**
>> >         * @return table {@link CatalogTable} instance.
>> >         */
>> >        CatalogTable getTable();
>> >        /**
>> >         * @return readable config of this table environment.
>> >         */
>> >        ReadableConfig getTableConfig();
>> >     }
>> > }
>> >
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>> >
>> >> Hi all,
>> >>
>> >> After rethinking and discussion with Kurt, I'd like to remove
>> "isBounded".
>> >> We can delay this is bounded message to TableSink.
>> >> With TableSink refactor, we need consider "consumeDataStream"
>> >> and "consumeBoundedStream".
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Jark,
>> >>>
>> >>> Thanks involving, yes, it's hard to understand to add isBounded on the
>> >>> source.
>> >>> I recommend adding only to sink at present, because sink has upstream.
>> >>> Its upstream is either bounded or unbounded.
>> >>>
>> >>> Hi all,
>> >>>
>> >>> Let me summarize with your suggestions.
>> >>>
>> >>> public interface TableSourceFactory<T> extends TableFactory {
>> >>>
>> >>>     ......
>> >>>
>> >>>
>> >>>     /**
>> >>>      * Creates and configures a {@link TableSource} based on the
>> given {@link Context}.
>> >>>      *
>> >>>      * @param context context of this table source.
>> >>>      * @return the configured table source.
>> >>>      */
>> >>>     default TableSource<T> createTableSource(Context context) {
>> >>>        ObjectIdentifier tableIdentifier =
>> context.getTableIdentifier();
>> >>>        return createTableSource(
>> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
>> tableIdentifier.getObjectName()),
>> >>>              context.getTable());
>> >>>     }
>> >>>
>> >>>     /**
>> >>>      * Context of table source creation. Contains table information
>> and environment information.
>> >>>      */
>> >>>     interface Context {
>> >>>
>> >>>        /**
>> >>>         * @return full identifier of the given {@link CatalogTable}.
>> >>>         */
>> >>>        ObjectIdentifier getTableIdentifier();
>> >>>
>> >>>        /**
>> >>>         * @return table {@link CatalogTable} instance.
>> >>>         */
>> >>>        CatalogTable getTable();
>> >>>
>> >>>        /**
>> >>>         * @return readable config of this table environment.
>> >>>         */
>> >>>        ReadableConfig getTableConfig();
>> >>>     }
>> >>> }
>> >>>
>> >>> public interface TableSinkFactory<T> extends TableFactory {
>> >>>
>> >>>     ......
>> >>>
>> >>>     /**
>> >>>      * Creates and configures a {@link TableSink} based on the given
>> {@link Context}.
>> >>>      *
>> >>>      * @param context context of this table sink.
>> >>>      * @return the configured table sink.
>> >>>      */
>> >>>     default TableSink<T> createTableSink(Context context) {
>> >>>        ObjectIdentifier tableIdentifier =
>> context.getTableIdentifier();
>> >>>        return createTableSink(
>> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
>> tableIdentifier.getObjectName()),
>> >>>              context.getTable());
>> >>>     }
>> >>>
>> >>>     /**
>> >>>      * Context of table sink creation. Contains table information and
>> environment information.
>> >>>      */
>> >>>     interface Context {
>> >>>
>> >>>        /**
>> >>>         * @return full identifier of the given {@link CatalogTable}.
>> >>>         */
>> >>>        ObjectIdentifier getTableIdentifier();
>> >>>
>> >>>        /**
>> >>>         * @return table {@link CatalogTable} instance.
>> >>>         */
>> >>>        CatalogTable getTable();
>> >>>
>> >>>        /**
>> >>>         * @return readable config of this table environment.
>> >>>         */
>> >>>        ReadableConfig getTableConfig();
>> >>>
>> >>>        /**
>> >>>         * @return Input whether or not it is bounded.
>> >>>         */
>> >>>        boolean isBounded();
>> >>>     }
>> >>> }
>> >>>
>> >>> If there is no objection, I will start a vote thread. (if necessary, I
>> >>> can also edit a FLIP).
>> >>>
>> >>> Best,
>> >>> Jingsong Lee
>> >>>
>> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Thanks Bowen and Timo for involving.
>> >>>>
>> >>>> Hi Bowen,
>> >>>>
>> >>>>> 1. is it better to have explicit APIs like
>> >>>> "createBatchTableSource(...)"
>> >>>> I think it is better to keep one method, since in [1], we have
>> reached
>> >>>> one in DataStream layer to maintain a single API in "env.source". I
>> think
>> >>>> it is good to not split batch and stream, And our
>> TableSource/TableSink are
>> >>>> the same class for both batch and streaming too.
>> >>>>
>> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
>> >>>> As Timo said, We may have more parameters to add in the future, take
>> a
>> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by
>> little.
>> >>>>
>> >>>> Hi Timo,
>> >>>>
>> >>>> Your suggestion about Context looks good to me.
>> >>>> "TablePath" used in Hive for updating the catalog information of this
>> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>> >>>>
>> >>>>> Can we postpone the change of TableValidators?
>> >>>> Yes, ConfigOption validation looks good to me. It seems that you have
>> >>>> been thinking about this for a long time. It's very good. Looking
>> forward
>> >>>> to the promotion of FLIP-54.
>> >>>>
>> >>>> [1]
>> >>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>> >>>>
>> >>>> Best,
>> >>>> Jingsong Lee
>> >>>>
>> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org>
>> wrote:
>> >>>>
>> >>>>> Hi Jingsong,
>> >>>>>
>> >>>>> +1 for adding a context in the source and sink factories. A context
>> >>>>> class also allows for future modifications without touching the
>> >>>>> TableFactory interface again.
>> >>>>>
>> >>>>> How about:
>> >>>>>
>> >>>>> interface TableSourceFactory {
>> >>>>>       interface Context {
>> >>>>>          // ...
>> >>>>>       }
>> >>>>> }
>> >>>>>
>> >>>>> Because I find the name `CatalogTableContext` confusing and we can
>> >>>>> bound
>> >>>>> the interface to the factory class itself as an inner interface.
>> >>>>>
>> >>>>> Readable access to configuration sounds also right to me. Can we
>> remove
>> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>> >>>>> factory should know the path. And if so, it should be an
>> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
>> using.
>> >>>>>
>> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because
>> we
>> >>>>> would like to use terminology around boundedness rather than
>> >>>>> streaming/batch.
>> >>>>>
>> >>>>> @Bowen: We are in the process of unifying the APIs and thus
>> explicitly
>> >>>>> avoid specialized methods in the future.
>> >>>>>
>> >>>>> Can we postpone the change of TableValidators? I don't think that
>> every
>> >>>>> factory needs a schema validator. Ideally, the factory should just
>> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>> >>>>> validation logic as mentioned in the validation part of FLIP-54[1].
>> But
>> >>>>> currently our config options are not rich enough to have a unified
>> >>>>> validation. Additionally, the factory should return some properties
>> >>>>> such
>> >>>>> as "supports event-time" for the schema validation outside of the
>> >>>>> factory itself.
>> >>>>>
>> >>>>> Regards,
>> >>>>> Timo
>> >>>>>
>> >>>>> [1]
>> >>>>>
>> >>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On 16.01.20 00:51, Bowen Li wrote:
>> >>>>>> Hi Jingsong,
>> >>>>>>
>> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm
>> more
>> >>>>>> familiar with them. I agree these are shortcomings of the current
>> >>>>> Flink SQL
>> >>>>>> design.
>> >>>>>>
>> >>>>>> A couple comments on your 1st proposal:
>> >>>>>>
>> >>>>>> 1. is it better to have explicit APIs like
>> >>>>> "createBatchTableSource(...)"
>> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would
>> be
>> >>>>>> similar for sink factory) to let planner handle which mode
>> (streaming
>> >>>>> vs
>> >>>>>> batch) of source should be instantiated? That way we don't need to
>> >>>>> always
>> >>>>>> let connector developers handling an if-else on isStreamingMode.
>> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
>> class.
>> >>>>> The
>> >>>>>> path, table, and config are fairly independent of each other. So
>> why
>> >>>>> not
>> >>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path,
>> >>>>>> catalogTable, tableConfig)?
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
>> jingsongl...@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi dev,
>> >>>>>>>
>> >>>>>>> I'd like to kick off a discussion on the improvement of
>> >>>>> TableSourceFactory
>> >>>>>>> and TableSinkFactory.
>> >>>>>>>
>> >>>>>>> Motivation:
>> >>>>>>> Now the main needs and problems are:
>> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
>> >>>>> need to be
>> >>>>>>> controlled by the user's table configuration. In the era of
>> catalog,
>> >>>>> we
>> >>>>>>> can't put these config in connector properties, which is too
>> >>>>> inconvenient.
>> >>>>>>> 2.Connector can't know if this is batch or stream execution mode.
>> >>>>> But the
>> >>>>>>> sink implementation of batch and stream is totally different. I
>> >>>>> understand
>> >>>>>>> there is an update mode property now, but it splits the batch and
>> >>>>> stream in
>> >>>>>>> the catalog dimension. In fact, this information can be obtained
>> >>>>> through
>> >>>>>>> the current TableEnvironment.
>> >>>>>>> 3.No interface to call validation. Now our validation is more util
>> >>>>> classes.
>> >>>>>>> It depends on whether or not the connector calls. Now we have some
>> >>>>> new
>> >>>>>>> validations to add, such as [2], which is really confuse uses,
>> even
>> >>>>>>> developers. Another problem is that our SQL update (DDL) does not
>> >>>>> have
>> >>>>>>> validation [3]. It is better to report an error when executing
>> DDL,
>> >>>>>>> otherwise it will confuse the user.
>> >>>>>>>
>> >>>>>>> Proposed change draft for 1 and 2:
>> >>>>>>>
>> >>>>>>> interface CatalogTableContext {
>> >>>>>>>      ObjectPath getTablePath();
>> >>>>>>>      CatalogTable getTable();
>> >>>>>>>      ReadableConfig getTableConfig();
>> >>>>>>>      boolean isStreamingMode();
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
>> >>>>>>>
>> >>>>>>>      default TableSource<T> createTableSource(CatalogTableContext
>> >>>>> context) {
>> >>>>>>>         return createTableSource(context.getTablePath(),
>> >>>>> context.getTable());
>> >>>>>>>      }
>> >>>>>>>
>> >>>>>>>      ......
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> Proposed change draft for 3:
>> >>>>>>>
>> >>>>>>> public interface TableFactory {
>> >>>>>>>
>> >>>>>>>      TableValidators validators();
>> >>>>>>>
>> >>>>>>>      interface TableValidators {
>> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
>> >>>>>>>         TableSchemaValidator schemaValidator();
>> >>>>>>>         FormatDescriptorValidator formatValidator();
>> >>>>>>>      }
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> What do you think?
>> >>>>>>>
>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
>> >>>>>>> [2]
>> >>>>>>>
>> >>>>>>>
>> >>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Jingsong Lee
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>> --
>> >>>> Best, Jingsong Lee
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>> Best, Jingsong Lee
>> >>>
>> >>
>> >>
>> >> --
>> >> Best, Jingsong Lee
>> >>
>> >
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Reply via email to