[
https://issues.apache.org/jira/browse/FLINK-24776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther updated FLINK-24776:
---------------------------------
Description:
Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface
has not clear requirements and it's confusing for implementers. In particular,
it's unclear whether the format need to support projection push down or not,
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected
and includes partition keys or not. An example of such misunderstanding is
shown here:
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107
The PR https://github.com/apache/flink/pull/17544 partially addresses the
issue, because it removes the need from BulkFormat implementations to take care
of partition keys handling. Neverthless, it's still unclear whether formats
support projections or not and if they support nested projections.
We should refactor {{DecodingFormat}} as follows:
- Clarify DecodingFormat and introduce ProjectableDecodingFormat.
- Introduce ProjectedRowData and Projection to simplify implementations of
connectors that needs to deal with projections
- Apply the changes to most of the formats and connectors we have.
was:
Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface
has not clear requirements and it's confusing for implementers. In particular,
it's unclear whether the format need to support projection push down or not,
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected
and includes partition keys or not. An example of such misunderstanding is
shown here:
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107
The PR https://github.com/apache/flink/pull/17544 partially addresses the
issue, because it removes the need from BulkFormat implementations to take care
of partition keys handling. Neverthless, it's still unclear whether formats
support projections or not and if they support nested projections.
We should refactor {{DecodingFormat}} as follows:
* We document that every {{DecodingFormat}} *MUST* support projections. This is
already the case for every format we have (see
https://github.com/apache/flink/pull/17544#issuecomment-953184692). A
{{DecodingFormat}} *MAY* also support nested projections, and this is signaled
by a new method {{DecodingFormat#supportsNestedProjection()}}
* Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context,
DataType physicalDataType, int[][] projections)}} that users should now
implement. The {{physicalDataType}} in this signature is the physical data type
from the table schema stripped of metadata columns and partition keys, with
fields in the order defined by the table schema. The user can compute the final
type with {{DataType.projectFields(physicalDataType, projections)}}
* Deprecate the old {{createRuntimeDecoder}}
* Default implement the new and old {{createRuntimeDecoder}} to ensure backward
compatibility.
As alternative, we ([~twalthr] and I) explored the idea that formats might not
support projection push down, but this is very unlikely and such change
requires several planner changes, including breaking the interface
{{SupportsProjectionPushDown}}.
We should also provide a {{RowData}} implementation that takes care of
projection internally, so the {{DecodingFormat}} implementer that doesn't want
to support projections can just use this {{RowData}} wrapper like: {{new
ProjectedRowData(rowDataProducedByFormat, projections)}}.
> Clarify semantics of DecodingFormat and its data type
> -----------------------------------------------------
>
> Key: FLINK-24776
> URL: https://issues.apache.org/jira/browse/FLINK-24776
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Francesco Guardiani
> Priority: Major
> Labels: pull-request-available
>
> Today the {{org.apache.flink.table.connector.format.DecodingFormat}}
> interface has not clear requirements and it's confusing for implementers. In
> particular, it's unclear whether the format need to support projection push
> down or not, and whether the {{DataType}} provided to
> {{createRuntimeDecoder}} is projected and includes partition keys or not. An
> example of such misunderstanding is shown here:
> https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107
> The PR https://github.com/apache/flink/pull/17544 partially addresses the
> issue, because it removes the need from BulkFormat implementations to take
> care of partition keys handling. Neverthless, it's still unclear whether
> formats support projections or not and if they support nested projections.
> We should refactor {{DecodingFormat}} as follows:
> - Clarify DecodingFormat and introduce ProjectableDecodingFormat.
> - Introduce ProjectedRowData and Projection to simplify implementations of
> connectors that needs to deal with projections
> - Apply the changes to most of the formats and connectors we have.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)