[ https://issues.apache.org/jira/browse/FLINK-24776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timo Walther updated FLINK-24776: --------------------------------- Description: Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface has not clear requirements and it's confusing for implementers. In particular, it's unclear whether the format need to support projection push down or not, and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected and includes partition keys or not. An example of such misunderstanding is shown here: https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107 The PR https://github.com/apache/flink/pull/17544 partially addresses the issue, because it removes the need from BulkFormat implementations to take care of partition keys handling. Neverthless, it's still unclear whether formats support projections or not and if they support nested projections. We should refactor {{DecodingFormat}} as follows: - Clarify DecodingFormat and introduce ProjectableDecodingFormat. - Introduce ProjectedRowData and Projection to simplify implementations of connectors that needs to deal with projections - Apply the changes to most of the formats and connectors we have. was: Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface has not clear requirements and it's confusing for implementers. In particular, it's unclear whether the format need to support projection push down or not, and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected and includes partition keys or not. An example of such misunderstanding is shown here: https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107 The PR https://github.com/apache/flink/pull/17544 partially addresses the issue, because it removes the need from BulkFormat implementations to take care of partition keys handling. Neverthless, it's still unclear whether formats support projections or not and if they support nested projections. We should refactor {{DecodingFormat}} as follows: * We document that every {{DecodingFormat}} *MUST* support projections. This is already the case for every format we have (see https://github.com/apache/flink/pull/17544#issuecomment-953184692). A {{DecodingFormat}} *MAY* also support nested projections, and this is signaled by a new method {{DecodingFormat#supportsNestedProjection()}} * Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType, int[][] projections)}} that users should now implement. The {{physicalDataType}} in this signature is the physical data type from the table schema stripped of metadata columns and partition keys, with fields in the order defined by the table schema. The user can compute the final type with {{DataType.projectFields(physicalDataType, projections)}} * Deprecate the old {{createRuntimeDecoder}} * Default implement the new and old {{createRuntimeDecoder}} to ensure backward compatibility. As alternative, we ([~twalthr] and I) explored the idea that formats might not support projection push down, but this is very unlikely and such change requires several planner changes, including breaking the interface {{SupportsProjectionPushDown}}. We should also provide a {{RowData}} implementation that takes care of projection internally, so the {{DecodingFormat}} implementer that doesn't want to support projections can just use this {{RowData}} wrapper like: {{new ProjectedRowData(rowDataProducedByFormat, projections)}}. > Clarify semantics of DecodingFormat and its data type > ----------------------------------------------------- > > Key: FLINK-24776 > URL: https://issues.apache.org/jira/browse/FLINK-24776 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Francesco Guardiani > Priority: Major > Labels: pull-request-available > > Today the {{org.apache.flink.table.connector.format.DecodingFormat}} > interface has not clear requirements and it's confusing for implementers. In > particular, it's unclear whether the format need to support projection push > down or not, and whether the {{DataType}} provided to > {{createRuntimeDecoder}} is projected and includes partition keys or not. An > example of such misunderstanding is shown here: > https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107 > The PR https://github.com/apache/flink/pull/17544 partially addresses the > issue, because it removes the need from BulkFormat implementations to take > care of partition keys handling. Neverthless, it's still unclear whether > formats support projections or not and if they support nested projections. > We should refactor {{DecodingFormat}} as follows: > - Clarify DecodingFormat and introduce ProjectableDecodingFormat. > - Introduce ProjectedRowData and Projection to simplify implementations of > connectors that needs to deal with projections > - Apply the changes to most of the formats and connectors we have. -- This message was sent by Atlassian Jira (v8.20.1#820001)