[
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jingzhang updated FLINK-5568:
-----------------------------
Description:
The {{TableEnvironment}} now provides a mechanism to register temporary table.
It registers the temp table to calcite catalog, so SQL and TableAPI queries can
access to those temp tables. Now DatasetTable, DataStreamTable and
TableSourceTable can be registered to {{TableEnvironment}} as temporary tables.
This issue wants to provides a mechanism to connect external catalogs such as
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access
to tables in the external catalogs without register those tables to
{{TableEnvironment}} beforehand.
First, we should point out that there are two kinds of catalog in Flink
actually.
The first one is external catalog as we mentioned before, it provides CRUD
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or
TableAPI.
So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so
the tables/databases in external catalog can be accessed in Calcite catalog.
Including convert databases of externalCatalog to Calcite sub-schemas, convert
tables in a database of externalCatalog to Calcite tables (only support
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.
There is still a detail problem need to be take into consideration, that is ,
how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The question
is equals to convert {{ExternalCatalogTable}} to {{TableSource}} because we
could easily get {{TableSourceTable}} from {{TableSource}}.
Because different {{TableSource}} often contains different fields to initiate
an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes,
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}}
needs configuration and tableName to create a new instance. So it's not a good
idea to let Flink framework be responsible for translate
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}.
Here is one solution. Let {{TableSource}} specify a converter.
1. provide an Annatition named {{ExternalCatalogCompatible}}. The
{{TableSource}} with the annotation means it is compatible with external
catalog, that is, it could be converted to or from ExternalCatalogTable. This
annotation specifies the tabletype and converter of the tableSource. For
example, for {{CsvTableSource}}, it specifies the tableType is csv and
converter class is CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter =
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the
converter based on tableType. and let converter do convert
was:
The {{TableEnvironment}} now provides a mechanism to register temporary table.
It registers the temp table to calcite catalog, so SQL and TableAPI queries can
access to those temp tables. Now DatasetTable, DataStreamTable and
TableSourceTable can be registered to {{TableEnvironment}} as temporary tables.
This issue wants to provides a mechanism to connect external catalogs such as
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access
to tables in the external catalogs without register those tables to
{{TableEnvironment}} beforehand.
First, we should point out that there are two kinds of catalog in Flink
actually.
The first one is external catalog as we mentioned before, it provides CRUD
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or
TableAPI.
So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so
the tables/databases in external catalog can be accessed in Calcite catalog.
Including convert databases of externalCatalog to Calcite sub-schemas, convert
tables in a database of externalCatalog to Calcite tables (only support
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.
There is still a detail problem need to be take into consideration, that is ,
how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The question
is equals to convert {{ExternalCatalogTable}} to {{TableSource}} because we
could easily get {{TableSourceTable}} from {{TableSource}}.
Because different {{TableSource}} often contains different fields to initiate
an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes,
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}}
needs configuration and tableName to create a new instance. So it's not a good
idea to let Flink framework be responsible for translate
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}.
Here is one solution. Let {{TableSource}} specify a converter.
1. provide an Annatition named ExternalCatalogCompatible. The {{TableSource}}
with the annotation means it is compatible with external catalog, that is, it
could be converted to or from ExternalCatalogTable. This annotation specifies
the tabletype and converter of the tableSource. For example, for
{{CsvTableSource}}, it specifies the tableType is csv and converter class is
CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter =
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the
converter based on tableType. and let converter do convert
> Introduce interface for catalog, and provide an in-memory implementation, and
> integrate with calcite schema
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Kurt Young
> Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary
> table. It registers the temp table to calcite catalog, so SQL and TableAPI
> queries can access to those temp tables. Now DatasetTable, DataStreamTable
> and TableSourceTable can be registered to {{TableEnvironment}} as temporary
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could
> access to tables in the external catalogs without register those tables to
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink
> actually.
> The first one is external catalog as we mentioned before, it provides CRUD
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed
> in Calcite queries. It depends on Calcite Schema/Table abstraction.
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so
> the tables/databases in external catalog can be accessed in Calcite catalog.
> Including convert databases of externalCatalog to Calcite sub-schemas,
> convert tables in a database of externalCatalog to Calcite tables (only
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> There is still a detail problem need to be take into consideration, that is ,
> how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The
> question is equals to convert {{ExternalCatalogTable}} to {{TableSource}}
> because we could easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate
> an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes,
> fieldDelim, rowDelim and so on to create a new instance ,
> {{KafkaTableSource}} needs configuration and tableName to create a new
> instance. So it's not a good idea to let Flink framework be responsible for
> translate {{ExternalCatalogTable}} to different kind of
> {{TableSourceTable}}.
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide an Annatition named {{ExternalCatalogCompatible}}. The
> {{TableSource}} with the annotation means it is compatible with external
> catalog, that is, it could be converted to or from ExternalCatalogTable. This
> annotation specifies the tabletype and converter of the tableSource. For
> example, for {{CsvTableSource}}, it specifies the tableType is csv and
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter =
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the
> converter based on tableType. and let converter do convert
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)