[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882305#comment-15882305
 ] 

ASF GitHub Bot commented on FLINK-5568:
---------------------------------------

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/3409

    [flink-5570] [Table API & SQL]Support register external catalog to table 
environment

    This pr aims to support register external catalog to TableEnvironment.
    The pr contains two commits, the first one is about 
https://issues.apache.org/jira/browse/FLINK-5568, it's content is as same as 
(https://github.com/apache/flink/pull/3406).
    The second commit is to support externalCatalog registration. So please 
focus on the second commit when you review this pr.
    
    The main changes in the second commit including:
    1. add registerExternalCatalog method in TableEnvironment to register 
external catalog
    2. add scan method in TableEnvironment to scan the table of the external 
catalog
    3. add test cases for ExternalCatalog, including registration and scan

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink flink-5570

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3409
    
----
commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476
Author: jingzhang <[email protected]>
Date:   2017-02-22T11:28:08Z

    Introduce interface for external catalog, and provide an in-memory 
implementation for test or develop. Integrate with calcite catalog.

commit 05e2b13847fab01e330d4bf2232886a793f7dd0c
Author: jingzhang <[email protected]>
Date:   2017-02-24T06:10:50Z

    Support register external catalog to table environment

----


> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-5568
>                 URL: https://issues.apache.org/jira/browse/FLINK-5568
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kurt Young
>            Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier                      | TableIdentifier | dbName and tableName 
> of table |
> |  tableType                     | String | type of external catalog table, 
> e.g csv, hbase, kafka
> |  schema                        | DataSchema|  schema of table data, 
> including column names and column types
> | partitionColumnNames | List<String> | names of partition column
> | properties                      | Map<String, String> |properties of 
> external catalog table
> | stats                               | TableStats | statistics of external 
> catalog table 
> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
> {{TableSource}} with the annotation means it is compatible with external 
> catalog, that is, it could be converted to or from ExternalCatalogTable. This 
> annotation specifies the tabletype and converter of the tableSource. For 
> example, for {{CsvTableSource}}, it specifies the tableType is csv and 
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = 
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
> converter based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to