[
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880229#comment-15880229
]
jingzhang edited comment on FLINK-5568 at 2/23/17 10:28 AM:
------------------------------------------------------------
[~fhueske], thanks for your advices.
Here is my thoughts on your questions, looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external
catalog.
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}}
is the table of Calcite Catalog because it extends to Calcite Table). But
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite
schema would first delegate its underlying externalCatalog to look up the
{{ExternalCatalogTable}} instance , then calcite schema returns a
TableSourceTable which holds the TableSource that are generated by the
converter from the {{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3. It's my bad to said unclearly. We don't want to implement a new Schema
class. In fact, we prefer to use Flink's representation, The DataSchema mode is
as following:
{code}
case class DataSchema(
columnTypes: Array[TypeInformation[_]],
columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is
annotated with {{@ExternalCatalogCompatible}}. We plan to depends on configure
file.
* let each connector specifies the scan packages in appointed configure
file.
* try to look up all the resources with the given name of classloader ,
and parse the scan-packages fields.
Looking forward to your advices, thanks.
was (Author: jinyu.zj):
[~fhueske], thanks for your advices.
Here is my thoughts on your questions, looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external
catalog.
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}}
is the table of Calcite Catalog because it extends to Calcite Table). But
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite
schema would first look up the {{ExternalCatalogTable}} instance from the
underlying externalCatalog, then return a TableSourceTable which holds the
TableSource that are generated by the converter from the
{{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3. It's my bad to said unclearly. We don't want to implement a new Schema
class. In fact, we prefer to use Flink's representation, The DataSchema mode is
as following:
{code}
case class DataSchema(
columnTypes: Array[TypeInformation[_]],
columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is
annotated with {{@ExternalCatalogCompatible}}. We plan to depends on configure
file.
* let each connector specifies the scan packages in appointed configure
file.
* try to look up all the resources with the given name of classloader ,
and parse the scan-packages fields.
Looking forward to your advices, thanks.
> Introduce interface for catalog, and provide an in-memory implementation, and
> integrate with calcite schema
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Kurt Young
> Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary
> table. It registers the temp table to calcite catalog, so SQL and TableAPI
> queries can access to those temp tables. Now DatasetTable, DataStreamTable
> and TableSourceTable can be registered to {{TableEnvironment}} as temporary
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could
> access to tables in the external catalogs without register those tables to
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink
> actually.
> The first one is external catalog as we mentioned before, it provides CRUD
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed
> in Calcite queries. It depends on Calcite Schema/Table abstraction.
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so
> the tables/databases in external catalog can be accessed in Calcite catalog.
> Including convert databases of externalCatalog to Calcite sub-schemas,
> convert tables in a database of externalCatalog to Calcite tables (only
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> | identifier | TableIdentifier | dbName and tableName
> of table |
> | tableType | String | type of external catalog table,
> e.g csv, hbase, kafka
> | schema | DataSchema| schema of table data,
> including column names and column types
> | partitionColumnNames | List<String> | names of partition column
> | properties | Map<String, String> |properties of
> external catalog table
> | stats | TableStats | statistics of external
> catalog table
> | comment | String |
> | create time | long
> There is still a detail problem need to be take into consideration, that is ,
> how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The
> question is equals to convert {{ExternalCatalogTable}} to {{TableSource}}
> because we could easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate
> an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes,
> fieldDelim, rowDelim and so on to create a new instance ,
> {{KafkaTableSource}} needs configuration and tableName to create a new
> instance. So it's not a good idea to let Flink framework be responsible for
> translate {{ExternalCatalogTable}} to different kind of
> {{TableSourceTable}}.
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide an Annatition named {{ExternalCatalogCompatible}}. The
> {{TableSource}} with the annotation means it is compatible with external
> catalog, that is, it could be converted to or from ExternalCatalogTable. This
> annotation specifies the tabletype and converter of the tableSource. For
> example, for {{CsvTableSource}}, it specifies the tableType is csv and
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter =
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the
> converter based on tableType. and let converter do convert
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)