Hi all, As Jark suggested in VOTE thread. JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
Best, Jingsong Lee On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi Timo, > > G ood catch! > > I really love the idea 2, a full Flink config looks very good to me. > > Try to understand your first one, actually we don't have `TableIdentifier` > class now. But TableFactory already indicate table. So I am OK. > > New Context should be: > > /** > * Context of table source creation. Contains table information and > environment information. > */ > interface Context { > /** > * @return full identifier of the given {@link CatalogTable}. > */ > ObjectIdentifier getObjectIdentifier(); > /** > * @return table {@link CatalogTable} instance. > */ > CatalogTable getTable(); > /** > * @return readable config of this table environment. > */ > ReadableConfig getConfiguration(); > } > > > Best, > Jingsong Lee > > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> wrote: > >> Hi Jingsong, >> >> some last minute changes from my side: >> >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API >> obvious. Otherwise people expect a `TableIdentifier` class being >> returned here. >> >> 2. rename `getTableConfig` to `getConfiguration()` in the future this >> will not only be a "table" config but might give access to the full >> Flink config >> >> Thanks, >> Timo >> >> >> On 04.02.20 06:27, Jingsong Li wrote: >> > So the interface will be: >> > >> > public interface TableSourceFactory<T> extends TableFactory { >> > ...... >> > >> > /** >> > * Creates and configures a {@link TableSource} based on the given >> > {@link Context}. >> > * >> > * @param context context of this table source. >> > * @return the configured table source. >> > */ >> > default TableSource<T> createTableSource(Context context) { >> > ObjectIdentifier tableIdentifier = context.getTableIdentifier(); >> > return createTableSource( >> > new ObjectPath(tableIdentifier.getDatabaseName(), >> > tableIdentifier.getObjectName()), >> > context.getTable()); >> > } >> > /** >> > * Context of table source creation. Contains table information and >> > environment information. >> > */ >> > interface Context { >> > /** >> > * @return full identifier of the given {@link CatalogTable}. >> > */ >> > ObjectIdentifier getTableIdentifier(); >> > /** >> > * @return table {@link CatalogTable} instance. >> > */ >> > CatalogTable getTable(); >> > /** >> > * @return readable config of this table environment. >> > */ >> > ReadableConfig getTableConfig(); >> > } >> > } >> > >> > public interface TableSinkFactory<T> extends TableFactory { >> > ...... >> > /** >> > * Creates and configures a {@link TableSink} based on the given >> > {@link Context}. >> > * >> > * @param context context of this table sink. >> > * @return the configured table sink. >> > */ >> > default TableSink<T> createTableSink(Context context) { >> > ObjectIdentifier tableIdentifier = context.getTableIdentifier(); >> > return createTableSink( >> > new ObjectPath(tableIdentifier.getDatabaseName(), >> > tableIdentifier.getObjectName()), >> > context.getTable()); >> > } >> > /** >> > * Context of table sink creation. Contains table information and >> > environment information. >> > */ >> > interface Context { >> > /** >> > * @return full identifier of the given {@link CatalogTable}. >> > */ >> > ObjectIdentifier getTableIdentifier(); >> > /** >> > * @return table {@link CatalogTable} instance. >> > */ >> > CatalogTable getTable(); >> > /** >> > * @return readable config of this table environment. >> > */ >> > ReadableConfig getTableConfig(); >> > } >> > } >> > >> > >> > Best, >> > Jingsong Lee >> > >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> > >> >> Hi all, >> >> >> >> After rethinking and discussion with Kurt, I'd like to remove >> "isBounded". >> >> We can delay this is bounded message to TableSink. >> >> With TableSink refactor, we need consider "consumeDataStream" >> >> and "consumeBoundedStream". >> >> >> >> Best, >> >> Jingsong Lee >> >> >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >> >> >>> Hi Jark, >> >>> >> >>> Thanks involving, yes, it's hard to understand to add isBounded on the >> >>> source. >> >>> I recommend adding only to sink at present, because sink has upstream. >> >>> Its upstream is either bounded or unbounded. >> >>> >> >>> Hi all, >> >>> >> >>> Let me summarize with your suggestions. >> >>> >> >>> public interface TableSourceFactory<T> extends TableFactory { >> >>> >> >>> ...... >> >>> >> >>> >> >>> /** >> >>> * Creates and configures a {@link TableSource} based on the >> given {@link Context}. >> >>> * >> >>> * @param context context of this table source. >> >>> * @return the configured table source. >> >>> */ >> >>> default TableSource<T> createTableSource(Context context) { >> >>> ObjectIdentifier tableIdentifier = >> context.getTableIdentifier(); >> >>> return createTableSource( >> >>> new ObjectPath(tableIdentifier.getDatabaseName(), >> tableIdentifier.getObjectName()), >> >>> context.getTable()); >> >>> } >> >>> >> >>> /** >> >>> * Context of table source creation. Contains table information >> and environment information. >> >>> */ >> >>> interface Context { >> >>> >> >>> /** >> >>> * @return full identifier of the given {@link CatalogTable}. >> >>> */ >> >>> ObjectIdentifier getTableIdentifier(); >> >>> >> >>> /** >> >>> * @return table {@link CatalogTable} instance. >> >>> */ >> >>> CatalogTable getTable(); >> >>> >> >>> /** >> >>> * @return readable config of this table environment. >> >>> */ >> >>> ReadableConfig getTableConfig(); >> >>> } >> >>> } >> >>> >> >>> public interface TableSinkFactory<T> extends TableFactory { >> >>> >> >>> ...... >> >>> >> >>> /** >> >>> * Creates and configures a {@link TableSink} based on the given >> {@link Context}. >> >>> * >> >>> * @param context context of this table sink. >> >>> * @return the configured table sink. >> >>> */ >> >>> default TableSink<T> createTableSink(Context context) { >> >>> ObjectIdentifier tableIdentifier = >> context.getTableIdentifier(); >> >>> return createTableSink( >> >>> new ObjectPath(tableIdentifier.getDatabaseName(), >> tableIdentifier.getObjectName()), >> >>> context.getTable()); >> >>> } >> >>> >> >>> /** >> >>> * Context of table sink creation. Contains table information and >> environment information. >> >>> */ >> >>> interface Context { >> >>> >> >>> /** >> >>> * @return full identifier of the given {@link CatalogTable}. >> >>> */ >> >>> ObjectIdentifier getTableIdentifier(); >> >>> >> >>> /** >> >>> * @return table {@link CatalogTable} instance. >> >>> */ >> >>> CatalogTable getTable(); >> >>> >> >>> /** >> >>> * @return readable config of this table environment. >> >>> */ >> >>> ReadableConfig getTableConfig(); >> >>> >> >>> /** >> >>> * @return Input whether or not it is bounded. >> >>> */ >> >>> boolean isBounded(); >> >>> } >> >>> } >> >>> >> >>> If there is no objection, I will start a vote thread. (if necessary, I >> >>> can also edit a FLIP). >> >>> >> >>> Best, >> >>> Jingsong Lee >> >>> >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com> >> >>> wrote: >> >>> >> >>>> Thanks Bowen and Timo for involving. >> >>>> >> >>>> Hi Bowen, >> >>>> >> >>>>> 1. is it better to have explicit APIs like >> >>>> "createBatchTableSource(...)" >> >>>> I think it is better to keep one method, since in [1], we have >> reached >> >>>> one in DataStream layer to maintain a single API in "env.source". I >> think >> >>>> it is good to not split batch and stream, And our >> TableSource/TableSink are >> >>>> the same class for both batch and streaming too. >> >>>> >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class. >> >>>> As Timo said, We may have more parameters to add in the future, take >> a >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by >> little. >> >>>> >> >>>> Hi Timo, >> >>>> >> >>>> Your suggestion about Context looks good to me. >> >>>> "TablePath" used in Hive for updating the catalog information of this >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath". >> >>>> >> >>>>> Can we postpone the change of TableValidators? >> >>>> Yes, ConfigOption validation looks good to me. It seems that you have >> >>>> been thinking about this for a long time. It's very good. Looking >> forward >> >>>> to the promotion of FLIP-54. >> >>>> >> >>>> [1] >> >>>> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692 >> >>>> >> >>>> Best, >> >>>> Jingsong Lee >> >>>> >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> >> wrote: >> >>>> >> >>>>> Hi Jingsong, >> >>>>> >> >>>>> +1 for adding a context in the source and sink factories. A context >> >>>>> class also allows for future modifications without touching the >> >>>>> TableFactory interface again. >> >>>>> >> >>>>> How about: >> >>>>> >> >>>>> interface TableSourceFactory { >> >>>>> interface Context { >> >>>>> // ... >> >>>>> } >> >>>>> } >> >>>>> >> >>>>> Because I find the name `CatalogTableContext` confusing and we can >> >>>>> bound >> >>>>> the interface to the factory class itself as an inner interface. >> >>>>> >> >>>>> Readable access to configuration sounds also right to me. Can we >> remove >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a >> >>>>> factory should know the path. And if so, it should be an >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are >> using. >> >>>>> >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because >> we >> >>>>> would like to use terminology around boundedness rather than >> >>>>> streaming/batch. >> >>>>> >> >>>>> @Bowen: We are in the process of unifying the APIs and thus >> explicitly >> >>>>> avoid specialized methods in the future. >> >>>>> >> >>>>> Can we postpone the change of TableValidators? I don't think that >> every >> >>>>> factory needs a schema validator. Ideally, the factory should just >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the >> >>>>> validation logic as mentioned in the validation part of FLIP-54[1]. >> But >> >>>>> currently our config options are not rich enough to have a unified >> >>>>> validation. Additionally, the factory should return some properties >> >>>>> such >> >>>>> as "supports event-time" for the schema validation outside of the >> >>>>> factory itself. >> >>>>> >> >>>>> Regards, >> >>>>> Timo >> >>>>> >> >>>>> [1] >> >>>>> >> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration >> >>>>> >> >>>>> >> >>>>> >> >>>>> On 16.01.20 00:51, Bowen Li wrote: >> >>>>>> Hi Jingsong, >> >>>>>> >> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm >> more >> >>>>>> familiar with them. I agree these are shortcomings of the current >> >>>>> Flink SQL >> >>>>>> design. >> >>>>>> >> >>>>>> A couple comments on your 1st proposal: >> >>>>>> >> >>>>>> 1. is it better to have explicit APIs like >> >>>>> "createBatchTableSource(...)" >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would >> be >> >>>>>> similar for sink factory) to let planner handle which mode >> (streaming >> >>>>> vs >> >>>>>> batch) of source should be instantiated? That way we don't need to >> >>>>> always >> >>>>>> let connector developers handling an if-else on isStreamingMode. >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext >> class. >> >>>>> The >> >>>>>> path, table, and config are fairly independent of each other. So >> why >> >>>>> not >> >>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path, >> >>>>>> catalogTable, tableConfig)? >> >>>>>> >> >>>>>> >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li < >> jingsongl...@gmail.com> >> >>>>> wrote: >> >>>>>> >> >>>>>>> Hi dev, >> >>>>>>> >> >>>>>>> I'd like to kick off a discussion on the improvement of >> >>>>> TableSourceFactory >> >>>>>>> and TableSinkFactory. >> >>>>>>> >> >>>>>>> Motivation: >> >>>>>>> Now the main needs and problems are: >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really >> >>>>> need to be >> >>>>>>> controlled by the user's table configuration. In the era of >> catalog, >> >>>>> we >> >>>>>>> can't put these config in connector properties, which is too >> >>>>> inconvenient. >> >>>>>>> 2.Connector can't know if this is batch or stream execution mode. >> >>>>> But the >> >>>>>>> sink implementation of batch and stream is totally different. I >> >>>>> understand >> >>>>>>> there is an update mode property now, but it splits the batch and >> >>>>> stream in >> >>>>>>> the catalog dimension. In fact, this information can be obtained >> >>>>> through >> >>>>>>> the current TableEnvironment. >> >>>>>>> 3.No interface to call validation. Now our validation is more util >> >>>>> classes. >> >>>>>>> It depends on whether or not the connector calls. Now we have some >> >>>>> new >> >>>>>>> validations to add, such as [2], which is really confuse uses, >> even >> >>>>>>> developers. Another problem is that our SQL update (DDL) does not >> >>>>> have >> >>>>>>> validation [3]. It is better to report an error when executing >> DDL, >> >>>>>>> otherwise it will confuse the user. >> >>>>>>> >> >>>>>>> Proposed change draft for 1 and 2: >> >>>>>>> >> >>>>>>> interface CatalogTableContext { >> >>>>>>> ObjectPath getTablePath(); >> >>>>>>> CatalogTable getTable(); >> >>>>>>> ReadableConfig getTableConfig(); >> >>>>>>> boolean isStreamingMode(); >> >>>>>>> } >> >>>>>>> >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory { >> >>>>>>> >> >>>>>>> default TableSource<T> createTableSource(CatalogTableContext >> >>>>> context) { >> >>>>>>> return createTableSource(context.getTablePath(), >> >>>>> context.getTable()); >> >>>>>>> } >> >>>>>>> >> >>>>>>> ...... >> >>>>>>> } >> >>>>>>> >> >>>>>>> Proposed change draft for 3: >> >>>>>>> >> >>>>>>> public interface TableFactory { >> >>>>>>> >> >>>>>>> TableValidators validators(); >> >>>>>>> >> >>>>>>> interface TableValidators { >> >>>>>>> ConnectorDescriptorValidator connectorValidator(); >> >>>>>>> TableSchemaValidator schemaValidator(); >> >>>>>>> FormatDescriptorValidator formatValidator(); >> >>>>>>> } >> >>>>>>> } >> >>>>>>> >> >>>>>>> What do you think? >> >>>>>>> >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290 >> >>>>>>> [2] >> >>>>>>> >> >>>>>>> >> >>>>> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556 >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509 >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Jingsong Lee >> >>>>>>> >> >>>>>> >> >>>>> >> >>>>> >> >>>> >> >>>> -- >> >>>> Best, Jingsong Lee >> >>>> >> >>> >> >>> >> >>> -- >> >>> Best, Jingsong Lee >> >>> >> >> >> >> >> >> -- >> >> Best, Jingsong Lee >> >> >> > >> > >> >> > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee