Francesco Guardiani created FLINK-24776:
-------------------------------------------
Summary: Clarify DecodingFormat
Key: FLINK-24776
URL: https://issues.apache.org/jira/browse/FLINK-24776
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Francesco Guardiani
Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface
has not clear requirements and it's confusing for implementers. In particular,
it's unclear whether the format need to support projection push down or not,
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected
and includes partition keys or not. An example of such misunderstanding is
shown here:
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107
The PR https://github.com/apache/flink/pull/17544 partially addresses the
issue, because it removes the need from BulkFormat implementations to take care
of partition keys handling. Neverthless, it's still unclear whether formats
support projections or not and if they support nested projections.
We should refactor {{DecodingFormat}} as follows:
* We document that every {{DecodingFormat}} *MUST* support projections. This is
already the case for every format we have (see
https://github.com/apache/flink/pull/17544#issuecomment-953184692). A
{{DecodingFormat}} *MAY* also support nested projections, and this is signaled
by a new method {{DecodingFormat#supportsNestedProjection()}}
* Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context,
DataType physicalDataType, int[][] projections)}} that users should now
implement. The {{physicalDataType}} in this signature is the physical data type
from the table schema stripped of metadata columns and partition keys, with
fields in the order defined by the table schema. The user can compute the final
type with {{DataType.projectFields(physicalDataType, projections)}}
* Deprecate the old {{createRuntimeDecoder}}
* Default implement the new and old {{createRuntimeDecoder}} to ensure backward
compatibility.
As alternative, we ([~twalthr] and I) explored the idea that formats might not
support projection push down, but this is very unlikely and such change
requires several planner changes, including breaking the interface
{{SupportsProjectionPushDown}}.
We should also provide a {{RowData}} implementation that takes care of
projection internally, so the {{DecodingFormat}} implementer that doesn't want
to support projections can just use this {{RowData}} wrapper like: {{new
ProjectedRowData(rowDataProducedByFormat, projections)}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)