[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15766799#comment-15766799
 ] 

Fabian Hueske commented on FLINK-5280:
--------------------------------------

Our discussion has changed a bit how I see the problem that needs to be fixed. 

Lets start with the current interface of {{TableSource}} and its purpose: 

A {{TableSource}} defines the schema of a table which is internally produced as 
a {{DataSet}} ({{BatchTableSource}}) or {{DataStream}} ({{StreamTableSource}}). 
A TableSource is basically internally converting a DataSet or DataStream into a 
Table. TableEnvironments have explicit methods to convert a DataSet of 
DataStream into a Table. There are two variants: 1) Without providing field 
names (example: {{java.BatchTableEnvironment.fromDataSet(DataSet dataset)}}).  
In this case the names are extracted from the TypeInformation of the DataSet or 
DataStream ("f0", "f1", ... for Tuple, the field names of a Pojo or case class, 
etc). 2) With parameters to specify the first level field names (example: 
{{java.BatchTableEnvironment.fromDataSet(DataSet dataset, String fields)}}). In 
case of a {{DataSet<Pojo>}}, the fields cannot be named by position but each 
existing field must be renamed to have a clear mapping.

The current interface has mainly three methods to describe the schema of the 
returned table:

* {{TypeInformation[] getReturnType()}} returns the type of the {{DataSet}} or 
{{DataStream}} produced by the {{TableSource}}. 
* {{String[] getFieldNames()}} returns the names of the first level of fields. 
The names are similar to the field names specified when converting a DataSet or 
DataStream into a Table.
* {{TypeInformation[] getFieldTypes()}} returns the types of the first level of 
fields as Flink {{FlinkTypeInfomation}}. These types are actually included in 
the ReturnType which is usually a {{CompositeType}}.

I see the following issues with the current interface:

* {{getFieldTypes()}} is kind of redundant with {{getReturnType()}}
* {{getFieldNames}} and {{getFieldTypes}} must match with the order of fields 
specified by the TypeInfo returned by {{getReturnType()}}. This is esp. tricky 
for Pojos where the order of fields is not explicit.
* {{getFieldNames}} does only allow to specify the names of the first level 
(this is the same as for {{fromDataset()}} or {{fromDataStream()}}. Hence, it 
is not possible to rename nested fields.


By following [~jark]'s proposal and only using {{getReturnType()}} we would 
address these issues as follows:

* {{getFieldTypes()}} and {{getFieldNames()}} would be removed and not be 
redundant and implicitly tied to {{getReturnType()}}. {{getNumberOfFields()}} 
would be removed as well.
* We would extract the Table schema only from the {{TypeInformation}} returned 
by {{getReturnType()}}. We can use existing logic for that (see 
{{fromDataSet()}}, {{fromDataStream()}}). Hence the behavior would be 
consistent with other parts of the API which is very good, IMO.
* By extracting the schema from the return type, we cannot override the field 
names for types with fixed field names (Tuple, Pojo, CaseClass). If a 
TableSource returns a Tuple, the fields will be named "f0", "f1", .... If the 
TableSource returns a Pojo or CaseClass, the fields will be named like the 
fields in the Pojo. 
* By fixing FLINK-5348 and extending {{RowTypeInfo}} to support custom field 
names, TableSources can use this type if they need to define custom field 
names. The returned type must be a Row then.
* Since we do not allow to rename fields, we do not have to care about mapping 
names to fields (no problem with the field order of Pojos). The order of fields 
is not important for the Table API and SQL.

To summarize, I think [~jark]'s proposal is very good and is the way to go. It 
would make the TableSource interface better because 

* it is consistent with other parts of the API
* it makes the interface leaner
* together with FLINK-5348, we can do everything that we need.

---

A specific Avro record is a Pojo which was code generated from an Avro schema. 
Flink can handle these generated classes as regular Pojos using the 
{{PojoTypeInfo}}.


> Extend TableSource to support nested data
> -----------------------------------------
>
>                 Key: FLINK-5280
>                 URL: https://issues.apache.org/jira/browse/FLINK-5280
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to