+1, thanks for the efforts. On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <jingsongl...@gmail.com> wrote:
> Hi all, > > As Jark suggested in VOTE thread. > JIRA created: https://issues.apache.org/jira/browse/FLINK-15912 > > Best, > Jingsong Lee > > On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > Hi Timo, > > > > G ood catch! > > > > I really love the idea 2, a full Flink config looks very good to me. > > > > Try to understand your first one, actually we don't have > `TableIdentifier` > > class now. But TableFactory already indicate table. So I am OK. > > > > New Context should be: > > > > /** > > * Context of table source creation. Contains table information and > environment information. > > */ > > interface Context { > > /** > > * @return full identifier of the given {@link CatalogTable}. > > */ > > ObjectIdentifier getObjectIdentifier(); > > /** > > * @return table {@link CatalogTable} instance. > > */ > > CatalogTable getTable(); > > /** > > * @return readable config of this table environment. > > */ > > ReadableConfig getConfiguration(); > > } > > > > > > Best, > > Jingsong Lee > > > > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> wrote: > > > >> Hi Jingsong, > >> > >> some last minute changes from my side: > >> > >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API > >> obvious. Otherwise people expect a `TableIdentifier` class being > >> returned here. > >> > >> 2. rename `getTableConfig` to `getConfiguration()` in the future this > >> will not only be a "table" config but might give access to the full > >> Flink config > >> > >> Thanks, > >> Timo > >> > >> > >> On 04.02.20 06:27, Jingsong Li wrote: > >> > So the interface will be: > >> > > >> > public interface TableSourceFactory<T> extends TableFactory { > >> > ...... > >> > > >> > /** > >> > * Creates and configures a {@link TableSource} based on the given > >> > {@link Context}. > >> > * > >> > * @param context context of this table source. > >> > * @return the configured table source. > >> > */ > >> > default TableSource<T> createTableSource(Context context) { > >> > ObjectIdentifier tableIdentifier = > context.getTableIdentifier(); > >> > return createTableSource( > >> > new ObjectPath(tableIdentifier.getDatabaseName(), > >> > tableIdentifier.getObjectName()), > >> > context.getTable()); > >> > } > >> > /** > >> > * Context of table source creation. Contains table information > and > >> > environment information. > >> > */ > >> > interface Context { > >> > /** > >> > * @return full identifier of the given {@link CatalogTable}. > >> > */ > >> > ObjectIdentifier getTableIdentifier(); > >> > /** > >> > * @return table {@link CatalogTable} instance. > >> > */ > >> > CatalogTable getTable(); > >> > /** > >> > * @return readable config of this table environment. > >> > */ > >> > ReadableConfig getTableConfig(); > >> > } > >> > } > >> > > >> > public interface TableSinkFactory<T> extends TableFactory { > >> > ...... > >> > /** > >> > * Creates and configures a {@link TableSink} based on the given > >> > {@link Context}. > >> > * > >> > * @param context context of this table sink. > >> > * @return the configured table sink. > >> > */ > >> > default TableSink<T> createTableSink(Context context) { > >> > ObjectIdentifier tableIdentifier = > context.getTableIdentifier(); > >> > return createTableSink( > >> > new ObjectPath(tableIdentifier.getDatabaseName(), > >> > tableIdentifier.getObjectName()), > >> > context.getTable()); > >> > } > >> > /** > >> > * Context of table sink creation. Contains table information and > >> > environment information. > >> > */ > >> > interface Context { > >> > /** > >> > * @return full identifier of the given {@link CatalogTable}. > >> > */ > >> > ObjectIdentifier getTableIdentifier(); > >> > /** > >> > * @return table {@link CatalogTable} instance. > >> > */ > >> > CatalogTable getTable(); > >> > /** > >> > * @return readable config of this table environment. > >> > */ > >> > ReadableConfig getTableConfig(); > >> > } > >> > } > >> > > >> > > >> > Best, > >> > Jingsong Lee > >> > > >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> > >> wrote: > >> > > >> >> Hi all, > >> >> > >> >> After rethinking and discussion with Kurt, I'd like to remove > >> "isBounded". > >> >> We can delay this is bounded message to TableSink. > >> >> With TableSink refactor, we need consider "consumeDataStream" > >> >> and "consumeBoundedStream". > >> >> > >> >> Best, > >> >> Jingsong Lee > >> >> > >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com> > >> wrote: > >> >> > >> >>> Hi Jark, > >> >>> > >> >>> Thanks involving, yes, it's hard to understand to add isBounded on > the > >> >>> source. > >> >>> I recommend adding only to sink at present, because sink has > upstream. > >> >>> Its upstream is either bounded or unbounded. > >> >>> > >> >>> Hi all, > >> >>> > >> >>> Let me summarize with your suggestions. > >> >>> > >> >>> public interface TableSourceFactory<T> extends TableFactory { > >> >>> > >> >>> ...... > >> >>> > >> >>> > >> >>> /** > >> >>> * Creates and configures a {@link TableSource} based on the > >> given {@link Context}. > >> >>> * > >> >>> * @param context context of this table source. > >> >>> * @return the configured table source. > >> >>> */ > >> >>> default TableSource<T> createTableSource(Context context) { > >> >>> ObjectIdentifier tableIdentifier = > >> context.getTableIdentifier(); > >> >>> return createTableSource( > >> >>> new ObjectPath(tableIdentifier.getDatabaseName(), > >> tableIdentifier.getObjectName()), > >> >>> context.getTable()); > >> >>> } > >> >>> > >> >>> /** > >> >>> * Context of table source creation. Contains table information > >> and environment information. > >> >>> */ > >> >>> interface Context { > >> >>> > >> >>> /** > >> >>> * @return full identifier of the given {@link CatalogTable}. > >> >>> */ > >> >>> ObjectIdentifier getTableIdentifier(); > >> >>> > >> >>> /** > >> >>> * @return table {@link CatalogTable} instance. > >> >>> */ > >> >>> CatalogTable getTable(); > >> >>> > >> >>> /** > >> >>> * @return readable config of this table environment. > >> >>> */ > >> >>> ReadableConfig getTableConfig(); > >> >>> } > >> >>> } > >> >>> > >> >>> public interface TableSinkFactory<T> extends TableFactory { > >> >>> > >> >>> ...... > >> >>> > >> >>> /** > >> >>> * Creates and configures a {@link TableSink} based on the given > >> {@link Context}. > >> >>> * > >> >>> * @param context context of this table sink. > >> >>> * @return the configured table sink. > >> >>> */ > >> >>> default TableSink<T> createTableSink(Context context) { > >> >>> ObjectIdentifier tableIdentifier = > >> context.getTableIdentifier(); > >> >>> return createTableSink( > >> >>> new ObjectPath(tableIdentifier.getDatabaseName(), > >> tableIdentifier.getObjectName()), > >> >>> context.getTable()); > >> >>> } > >> >>> > >> >>> /** > >> >>> * Context of table sink creation. Contains table information > and > >> environment information. > >> >>> */ > >> >>> interface Context { > >> >>> > >> >>> /** > >> >>> * @return full identifier of the given {@link CatalogTable}. > >> >>> */ > >> >>> ObjectIdentifier getTableIdentifier(); > >> >>> > >> >>> /** > >> >>> * @return table {@link CatalogTable} instance. > >> >>> */ > >> >>> CatalogTable getTable(); > >> >>> > >> >>> /** > >> >>> * @return readable config of this table environment. > >> >>> */ > >> >>> ReadableConfig getTableConfig(); > >> >>> > >> >>> /** > >> >>> * @return Input whether or not it is bounded. > >> >>> */ > >> >>> boolean isBounded(); > >> >>> } > >> >>> } > >> >>> > >> >>> If there is no objection, I will start a vote thread. (if > necessary, I > >> >>> can also edit a FLIP). > >> >>> > >> >>> Best, > >> >>> Jingsong Lee > >> >>> > >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com > > > >> >>> wrote: > >> >>> > >> >>>> Thanks Bowen and Timo for involving. > >> >>>> > >> >>>> Hi Bowen, > >> >>>> > >> >>>>> 1. is it better to have explicit APIs like > >> >>>> "createBatchTableSource(...)" > >> >>>> I think it is better to keep one method, since in [1], we have > >> reached > >> >>>> one in DataStream layer to maintain a single API in "env.source". I > >> think > >> >>>> it is good to not split batch and stream, And our > >> TableSource/TableSink are > >> >>>> the same class for both batch and streaming too. > >> >>>> > >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext > class. > >> >>>> As Timo said, We may have more parameters to add in the future, > take > >> a > >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by > >> little. > >> >>>> > >> >>>> Hi Timo, > >> >>>> > >> >>>> Your suggestion about Context looks good to me. > >> >>>> "TablePath" used in Hive for updating the catalog information of > this > >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath". > >> >>>> > >> >>>>> Can we postpone the change of TableValidators? > >> >>>> Yes, ConfigOption validation looks good to me. It seems that you > have > >> >>>> been thinking about this for a long time. It's very good. Looking > >> forward > >> >>>> to the promotion of FLIP-54. > >> >>>> > >> >>>> [1] > >> >>>> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692 > >> >>>> > >> >>>> Best, > >> >>>> Jingsong Lee > >> >>>> > >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> > >> wrote: > >> >>>> > >> >>>>> Hi Jingsong, > >> >>>>> > >> >>>>> +1 for adding a context in the source and sink factories. A > context > >> >>>>> class also allows for future modifications without touching the > >> >>>>> TableFactory interface again. > >> >>>>> > >> >>>>> How about: > >> >>>>> > >> >>>>> interface TableSourceFactory { > >> >>>>> interface Context { > >> >>>>> // ... > >> >>>>> } > >> >>>>> } > >> >>>>> > >> >>>>> Because I find the name `CatalogTableContext` confusing and we can > >> >>>>> bound > >> >>>>> the interface to the factory class itself as an inner interface. > >> >>>>> > >> >>>>> Readable access to configuration sounds also right to me. Can we > >> remove > >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a > >> >>>>> factory should know the path. And if so, it should be an > >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are > >> using. > >> >>>>> > >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because > >> we > >> >>>>> would like to use terminology around boundedness rather than > >> >>>>> streaming/batch. > >> >>>>> > >> >>>>> @Bowen: We are in the process of unifying the APIs and thus > >> explicitly > >> >>>>> avoid specialized methods in the future. > >> >>>>> > >> >>>>> Can we postpone the change of TableValidators? I don't think that > >> every > >> >>>>> factory needs a schema validator. Ideally, the factory should just > >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the > >> >>>>> validation logic as mentioned in the validation part of > FLIP-54[1]. > >> But > >> >>>>> currently our config options are not rich enough to have a unified > >> >>>>> validation. Additionally, the factory should return some > properties > >> >>>>> such > >> >>>>> as "supports event-time" for the schema validation outside of the > >> >>>>> factory itself. > >> >>>>> > >> >>>>> Regards, > >> >>>>> Timo > >> >>>>> > >> >>>>> [1] > >> >>>>> > >> >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> On 16.01.20 00:51, Bowen Li wrote: > >> >>>>>> Hi Jingsong, > >> >>>>>> > >> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm > >> more > >> >>>>>> familiar with them. I agree these are shortcomings of the current > >> >>>>> Flink SQL > >> >>>>>> design. > >> >>>>>> > >> >>>>>> A couple comments on your 1st proposal: > >> >>>>>> > >> >>>>>> 1. is it better to have explicit APIs like > >> >>>>> "createBatchTableSource(...)" > >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory > (would > >> be > >> >>>>>> similar for sink factory) to let planner handle which mode > >> (streaming > >> >>>>> vs > >> >>>>>> batch) of source should be instantiated? That way we don't need > to > >> >>>>> always > >> >>>>>> let connector developers handling an if-else on isStreamingMode. > >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext > >> class. > >> >>>>> The > >> >>>>>> path, table, and config are fairly independent of each other. So > >> why > >> >>>>> not > >> >>>>>> pass the config in as 3rd parameter as > `createXxxTableSource(path, > >> >>>>>> catalogTable, tableConfig)? > >> >>>>>> > >> >>>>>> > >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li < > >> jingsongl...@gmail.com> > >> >>>>> wrote: > >> >>>>>> > >> >>>>>>> Hi dev, > >> >>>>>>> > >> >>>>>>> I'd like to kick off a discussion on the improvement of > >> >>>>> TableSourceFactory > >> >>>>>>> and TableSinkFactory. > >> >>>>>>> > >> >>>>>>> Motivation: > >> >>>>>>> Now the main needs and problems are: > >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really > >> >>>>> need to be > >> >>>>>>> controlled by the user's table configuration. In the era of > >> catalog, > >> >>>>> we > >> >>>>>>> can't put these config in connector properties, which is too > >> >>>>> inconvenient. > >> >>>>>>> 2.Connector can't know if this is batch or stream execution > mode. > >> >>>>> But the > >> >>>>>>> sink implementation of batch and stream is totally different. I > >> >>>>> understand > >> >>>>>>> there is an update mode property now, but it splits the batch > and > >> >>>>> stream in > >> >>>>>>> the catalog dimension. In fact, this information can be obtained > >> >>>>> through > >> >>>>>>> the current TableEnvironment. > >> >>>>>>> 3.No interface to call validation. Now our validation is more > util > >> >>>>> classes. > >> >>>>>>> It depends on whether or not the connector calls. Now we have > some > >> >>>>> new > >> >>>>>>> validations to add, such as [2], which is really confuse uses, > >> even > >> >>>>>>> developers. Another problem is that our SQL update (DDL) does > not > >> >>>>> have > >> >>>>>>> validation [3]. It is better to report an error when executing > >> DDL, > >> >>>>>>> otherwise it will confuse the user. > >> >>>>>>> > >> >>>>>>> Proposed change draft for 1 and 2: > >> >>>>>>> > >> >>>>>>> interface CatalogTableContext { > >> >>>>>>> ObjectPath getTablePath(); > >> >>>>>>> CatalogTable getTable(); > >> >>>>>>> ReadableConfig getTableConfig(); > >> >>>>>>> boolean isStreamingMode(); > >> >>>>>>> } > >> >>>>>>> > >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory { > >> >>>>>>> > >> >>>>>>> default TableSource<T> > createTableSource(CatalogTableContext > >> >>>>> context) { > >> >>>>>>> return createTableSource(context.getTablePath(), > >> >>>>> context.getTable()); > >> >>>>>>> } > >> >>>>>>> > >> >>>>>>> ...... > >> >>>>>>> } > >> >>>>>>> > >> >>>>>>> Proposed change draft for 3: > >> >>>>>>> > >> >>>>>>> public interface TableFactory { > >> >>>>>>> > >> >>>>>>> TableValidators validators(); > >> >>>>>>> > >> >>>>>>> interface TableValidators { > >> >>>>>>> ConnectorDescriptorValidator connectorValidator(); > >> >>>>>>> TableSchemaValidator schemaValidator(); > >> >>>>>>> FormatDescriptorValidator formatValidator(); > >> >>>>>>> } > >> >>>>>>> } > >> >>>>>>> > >> >>>>>>> What do you think? > >> >>>>>>> > >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290 > >> >>>>>>> [2] > >> >>>>>>> > >> >>>>>>> > >> >>>>> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556 > >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509 > >> >>>>>>> > >> >>>>>>> Best, > >> >>>>>>> Jingsong Lee > >> >>>>>>> > >> >>>>>> > >> >>>>> > >> >>>>> > >> >>>> > >> >>>> -- > >> >>>> Best, Jingsong Lee > >> >>>> > >> >>> > >> >>> > >> >>> -- > >> >>> Best, Jingsong Lee > >> >>> > >> >> > >> >> > >> >> -- > >> >> Best, Jingsong Lee > >> >> > >> > > >> > > >> > >> > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee > -- Best regards! Rui Li