This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac203a7a26d78fe270bd9941bb721be53a893040 Author: slinkydeveloper <[email protected]> AuthorDate: Wed Sep 29 17:49:17 2021 +0200 [FLINK-24399][table-common] Add DynamicTableFactory.Context#getPrimaryKeyIndexes and DynamicTableFactory.Context#getPhysicalRowDataType Signed-off-by: slinkydeveloper <[email protected]> This closes #17381. --- .../flink/table/factories/DynamicTableFactory.java | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java index 3b87eff..7fb880a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.types.DataType; /** * Base interface for configuring a dynamic table connector for an external storage system from @@ -85,5 +86,49 @@ public interface DynamicTableFactory extends Factory { /** Whether the table is temporary. */ boolean isTemporary(); + + /** + * Returns the physical schema to use for encoding and decoding records. The returned row + * data type contains only physical columns. It does not include computed or metadata + * columns. A factory can use the returned data type to configure the table connector, and + * can manipulate it using the {@link DataType} static methods: + * + * <pre>{@code + * // Project some fields into a new data type + * DataType projectedDataType = DataType.projectRow( + * context.getPhysicalRowDataType(), projectedIndexes); + * + * // Create key data type + * DataType keyDataType = DataType.projectRow( + * context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); + * + * // Create a new data type filtering columns of the original data type + * DataType myOwnDataType = DataTypes.ROW( + * DataType.getFields(context.getPhysicalRowDataType()) + * .stream() + * .filter(myFieldFilterPredicate) + * .toArray(DataTypes.Field[]::new)) + * }</pre> + * + * <p>Shortcut for {@code getCatalogTable().getResolvedSchema().toPhysicalRowDataType()}. + * + * @see ResolvedSchema#toPhysicalRowDataType() + */ + default DataType getPhysicalRowDataType() { + return getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); + } + + /** + * Returns the primary key indexes, if any, otherwise returns an empty array. A factory can + * use it to compute the schema projection of the key fields with {@link + * DataType#projectFields(DataType, int[])}. + * + * <p>Shortcut for {@code getCatalogTable().getResolvedSchema().getPrimaryKeyIndexes()}. + * + * @see ResolvedSchema#getPrimaryKeyIndexes() + */ + default int[] getPrimaryKeyIndexes() { + return getCatalogTable().getResolvedSchema().getPrimaryKeyIndexes(); + } } }
