[ 
https://issues.apache.org/jira/browse/FLINK-24776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-24776:
---------------------------------
    Description: 
Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface 
has not clear requirements and it's confusing for implementers. In particular, 
it's unclear whether the format need to support projection push down or not, 
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected 
and includes partition keys or not. An example of such misunderstanding is 
shown here: 
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107

The PR https://github.com/apache/flink/pull/17544 partially addresses the 
issue, because it removes the need from BulkFormat implementations to take care 
of partition keys handling. Neverthless, it's still unclear whether formats 
support projections or not and if they support nested projections.

We should refactor {{DecodingFormat}} as follows:

- Clarify DecodingFormat and introduce ProjectableDecodingFormat.
- Introduce ProjectedRowData and Projection to simplify implementations of 
connectors that needs to deal with projections
- Apply the changes to most of the formats and connectors we have.

  was:
Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface 
has not clear requirements and it's confusing for implementers. In particular, 
it's unclear whether the format need to support projection push down or not, 
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected 
and includes partition keys or not. An example of such misunderstanding is 
shown here: 
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107

The PR https://github.com/apache/flink/pull/17544 partially addresses the 
issue, because it removes the need from BulkFormat implementations to take care 
of partition keys handling. Neverthless, it's still unclear whether formats 
support projections or not and if they support nested projections.

We should refactor {{DecodingFormat}} as follows:

* We document that every {{DecodingFormat}} *MUST* support projections. This is 
already the case for every format we have (see 
https://github.com/apache/flink/pull/17544#issuecomment-953184692). A 
{{DecodingFormat}} *MAY* also support nested projections, and this is signaled 
by a new method {{DecodingFormat#supportsNestedProjection()}}
* Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context, 
DataType physicalDataType, int[][] projections)}} that users should now 
implement. The {{physicalDataType}} in this signature is the physical data type 
from the table schema stripped of metadata columns and partition keys, with 
fields in the order defined by the table schema. The user can compute the final 
type with {{DataType.projectFields(physicalDataType, projections)}}
* Deprecate the old {{createRuntimeDecoder}}
* Default implement the new and old {{createRuntimeDecoder}} to ensure backward 
compatibility.

As alternative, we ([~twalthr] and I) explored the idea that formats might not 
support projection push down, but this is very unlikely and such change 
requires several planner changes, including breaking the interface 
{{SupportsProjectionPushDown}}.

We should also provide a {{RowData}} implementation that takes care of 
projection internally, so the {{DecodingFormat}} implementer that doesn't want 
to support projections can just use this {{RowData}} wrapper like: {{new 
ProjectedRowData(rowDataProducedByFormat, projections)}}.


> Clarify semantics of DecodingFormat and its data type
> -----------------------------------------------------
>
>                 Key: FLINK-24776
>                 URL: https://issues.apache.org/jira/browse/FLINK-24776
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Francesco Guardiani
>            Priority: Major
>              Labels: pull-request-available
>
> Today the {{org.apache.flink.table.connector.format.DecodingFormat}} 
> interface has not clear requirements and it's confusing for implementers. In 
> particular, it's unclear whether the format need to support projection push 
> down or not, and whether the {{DataType}} provided to 
> {{createRuntimeDecoder}} is projected and includes partition keys or not. An 
> example of such misunderstanding is shown here: 
> https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107
> The PR https://github.com/apache/flink/pull/17544 partially addresses the 
> issue, because it removes the need from BulkFormat implementations to take 
> care of partition keys handling. Neverthless, it's still unclear whether 
> formats support projections or not and if they support nested projections.
> We should refactor {{DecodingFormat}} as follows:
> - Clarify DecodingFormat and introduce ProjectableDecodingFormat.
> - Introduce ProjectedRowData and Projection to simplify implementations of 
> connectors that needs to deal with projections
> - Apply the changes to most of the formats and connectors we have.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to