[ https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880229#comment-15880229 ]
jingzhang commented on FLINK-5568: ---------------------------------- [~fhueske], thanks for your advices. Here is my thoughts on your questions, looking forward to your opinions. 1. {{ExternalCatalogTable}} is table definition or description of the external catalog. {{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} is the table of Calcite Catalog because it extends to Calcite Table). But {{ExternalCatalogTable}} is the table of External Catalog. When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite schema would first look up the {{ExternalCatalogTable}} instance from the underlying externalCatalog, then return a TableSourceTable which holds the TableSource that are generated by the converter from the {{ExternalCatalogTable}}. 2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}. 3. It's my bad to said unclearly. We don't want to implement a new Schema class. In fact, we prefer to use Flink's representation, The DataSchema mode is as following: {code} case class DataSchema( columnTypes: Array[TypeInformation[_]], columnNames: Array[String]) {code} 4. It is important to know where to scan these {{TableSource}} that is annotated with {{@ExternalCatalogCompatible}}. We plan to depends on configure file. * let each connector specifies the scan packages in appointed configure file. * try to look up all the resources with the given name of classloader , and parse the scan-packages fields. Looking forward to your advices, thanks. > Introduce interface for catalog, and provide an in-memory implementation, and > integrate with calcite schema > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-5568 > URL: https://issues.apache.org/jira/browse/FLINK-5568 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Kurt Young > Assignee: jingzhang > > The {{TableEnvironment}} now provides a mechanism to register temporary > table. It registers the temp table to calcite catalog, so SQL and TableAPI > queries can access to those temp tables. Now DatasetTable, DataStreamTable > and TableSourceTable can be registered to {{TableEnvironment}} as temporary > tables. > This issue wants to provides a mechanism to connect external catalogs such as > HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could > access to tables in the external catalogs without register those tables to > {{TableEnvironment}} beforehand. > First, we should point out that there are two kinds of catalog in Flink > actually. > The first one is external catalog as we mentioned before, it provides CRUD > operations to databases/tables. > The second one is calcite catalog, it defines namespace that can be accessed > in Calcite queries. It depends on Calcite Schema/Table abstraction. > SqlValidator and SqlConverter depends on the calcite catalog to fetch the > tables in SQL or TableAPI. > So we need to do the following things: > 1. introduce interface for external catalog, maybe provide an in-memory > implementation first for test and develop environment. > 2. introduce a mechanism to connect external catalog with Calcite catalog so > the tables/databases in external catalog can be accessed in Calcite catalog. > Including convert databases of externalCatalog to Calcite sub-schemas, > convert tables in a database of externalCatalog to Calcite tables (only > support {{TableSourceTable}}). > 3. register external catalog to {{TableEnvironment}}. > Here is the design mode of ExternalCatalogTable. > | identifier | TableIdentifier | dbName and tableName > of table | > | tableType | String | type of external catalog table, > e.g csv, hbase, kafka > | schema | DataSchema| schema of table data, > including column names and column types > | partitionColumnNames | List<String> | names of partition column > | properties | Map<String, String> |properties of > external catalog table > | stats | TableStats | statistics of external > catalog table > | comment | String | > | create time | long > There is still a detail problem need to be take into consideration, that is , > how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The > question is equals to convert {{ExternalCatalogTable}} to {{TableSource}} > because we could easily get {{TableSourceTable}} from {{TableSource}}. > Because different {{TableSource}} often contains different fields to initiate > an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes, > fieldDelim, rowDelim and so on to create a new instance , > {{KafkaTableSource}} needs configuration and tableName to create a new > instance. So it's not a good idea to let Flink framework be responsible for > translate {{ExternalCatalogTable}} to different kind of > {{TableSourceTable}}. > Here is one solution. Let {{TableSource}} specify a converter. > 1. provide an Annatition named {{ExternalCatalogCompatible}}. The > {{TableSource}} with the annotation means it is compatible with external > catalog, that is, it could be converted to or from ExternalCatalogTable. This > annotation specifies the tabletype and converter of the tableSource. For > example, for {{CsvTableSource}}, it specifies the tableType is csv and > converter class is CsvTableSourceConverter. > {code} > @ExternalCatalogCompatible(tableType = "csv", converter = > classOf[CsvTableSourceConverter]) > class CsvTableSource(...) { > ...} > {code} > 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save > the tableType and converter in a Map > 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the > converter based on tableType. and let converter do convert -- This message was sent by Atlassian JIRA (v6.3.15#6346)