[
https://issues.apache.org/jira/browse/FLINK-29267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603592#comment-17603592
]
Timo Walther commented on FLINK-29267:
--------------------------------------
> "you could also consider what pg extensions need loading before these are
> available"
This is what I consider the long-term solution in the description above. At
some point, Flink will hopefully also support user-defined types (i.e. CREATE
TYPE) and we will be able to load the postgres types into Flink's type system.
But this is future work. For now we need to get rid of the biggest obstacles
will a rather minimal but effective solution.
> "just let me cast the column in pg-specific code"
This is not an option as we have to consider many different connectors and
formats. E.g. Avro within Kafka or Debezium in Avro within Kafka.
> "DataTypes.OBJECT<UUID>(), x -> (UUID)x.getObject('column_name'))"
Ideally, a solution must be made available to both Java and pure SQL users.
> Support external type systems in DDL
> ------------------------------------
>
> Key: FLINK-29267
> URL: https://issues.apache.org/jira/browse/FLINK-29267
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC, Formats (JSON, Avro, Parquet, ORC,
> SequenceFile), Table SQL / Ecosystem
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
>
> Many connectors and formats require supporting external data types. Postgres
> users request UUID support, Avro users require enum support, etc.
> FLINK-19869 implemented support for Postgres UUIDs poorly and even impacts
> performance with regular strings.
> The long-term solution should be user-defined types in Flink. This is however
> a bigger effort that requires a FLIP and a bigger amount of resources.
> As a mid-term solution, we should offer a consistent approach based on DDL
> options that allows to define a mapping from Flink type system to the
> external type system. I suggest the following:
> {code}
> CREATE TABLE MyTable (
> ...
> ) WITH(
> 'mapping.data-types' = '<Flink field name>: <External field data type>'
> )
> {code}
> The mapping defines a map from Flink data type to external data type. The
> external data type should be string parsable. This works for most connectors
> and formats (e.g. Avro schema string).
> Examples:
> {code}
> CREATE TABLE MyTable (
> regular_col STRING,
> uuid_col STRING,
> point_col ARRAY<DOUBLE>,
> box_col ARRAY<ARRAY<DOUBLE>>
> ) WITH(
> 'mapping.data-types' = 'uuid_col: uuid, point_col: point, box_col: box'
> )
> {code}
> We provide a table of supported mapping data types. E.g. the {{point}} type
> is always maped to {{ARRAY<DOUBLE>}}. In general we choose a data type in
> Flink that comes closest to the required functionality.
> Future work:
> In theory, we can also offer mapping of field names. It might be a
> requirement that Flink's column name is different from the external system's
> one.
> {code}
> CREATE TABLE MyTable (
> ...
> ) WITH(
> 'mapping.names' = '<Flink field name>: <External field name>'
> )
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)