[
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879755#comment-15879755
]
jingzhang commented on FLINK-5568:
----------------------------------
[~fhueske], thanks for your response. There is still a detail problem need to
discuss, that is, how to convert ExternalCatalogTable to TableSourceTable. I
add the question and one solution in the description. Looking forward to your
advices.
> Introduce interface for catalog, and provide an in-memory implementation, and
> integrate with calcite schema
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Kurt Young
> Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary
> table. It registers the temp table to calcite catalog, so SQL and TableAPI
> queries can access to those temp tables. Now DatasetTable, DataStreamTable
> and TableSourceTable can be registered to {{TableEnvironment}} as temporary
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could
> access to tables in the external catalogs without register those tables to
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink
> actually.
> The first one is external catalog as we mentioned before, it provides CRUD
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed
> in Calcite queries. It depends on Calcite Schema/Table abstraction.
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so
> the tables/databases in external catalog can be accessed in Calcite catalog.
> Including convert databases of externalCatalog to Calcite sub-schemas,
> convert tables in a database of externalCatalog to Calcite tables (only
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> There is still a detail problem need to be take into consideration, that is ,
> how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The
> question is equals to convert {{ExternalCatalogTable}} to {{TableSource}}
> because we could easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate
> an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes,
> fieldDelim, rowDelim and so on to create a new instance ,
> {{KafkaTableSource}} needs configuration and tableName to create a new
> instance. So it's not a good idea to let Flink framework be responsible for
> translate {{ExternalCatalogTable}} to different kind of
> {{TableSourceTable}}.
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide an Annatition named {{ExternalCatalogCompatible}}. The
> {{TableSource}} with the annotation means it is compatible with external
> catalog, that is, it could be converted to or from ExternalCatalogTable. This
> annotation specifies the tabletype and converter of the tableSource. For
> example, for {{CsvTableSource}}, it specifies the tableType is csv and
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter =
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the
> converter based on tableType. and let converter do convert
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)