This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 0842feea1 [FLINK-37959][postgres]Supports all pgsql field types like debezium (#4086) 0842feea1 is described below commit 0842feea189e456a04cad0dfc2e3fa6efc1e47c9 Author: ouyangwulin <mr.art...@gmail.com> AuthorDate: Wed Aug 20 19:46:41 2025 +0800 [FLINK-37959][postgres]Supports all pgsql field types like debezium (#4086) --- .../connectors/pipeline-connectors/postgres.md | 317 +++++++++-- .../connectors/pipeline-connectors/postgres.md | 305 +++++++++-- .../postgres/source/PostgresEventDeserializer.java | 39 +- .../source/PostgresSchemaDataTypeInference.java | 4 +- .../postgres/utils/PostgresSchemaUtils.java | 21 +- .../postgres/utils/PostgresTypeUtils.java | 334 ++++++++---- .../postgres/source/PostgresFullTypesITCase.java | 586 ++++++++++++++++++++- .../src/test/resources/ddl/column_type_test.sql | 99 +++- .../src/test/resources/ddl/decimal_mode_test.sql | 70 +++ .../event/DebeziumEventDeserializationSchema.java | 48 +- .../event/DebeziumSchemaDataTypeInference.java | 8 +- 11 files changed, 1623 insertions(+), 208 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index 2ed47cfea..ca76905ae 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -290,7 +290,6 @@ pipeline: ## 数据类型映射 - <div class="wy-table-responsive"> <table class="colwidths-auto docutils"> <thead> @@ -300,6 +299,17 @@ pipeline: </tr> </thead> <tbody> + <tr> + <td> + BOOLEAN <br> + BIT(1) <br> + <td>BOOLEAN</td> + </tr> + <tr> + <td> + BIT( > 1) + <td>BYTES</td> + </tr> <tr> <td> SMALLINT<br> @@ -317,79 +327,290 @@ pipeline: <tr> <td> BIGINT<br> - BIGSERIAL</td> - <td>BIGINT</td> - </tr> - <tr> - <td>NUMERIC</td> - <td>DECIMAL(20, 0)</td> - </tr> - <tr> - <td>BIGINT</td> + BIGSERIAL<br> + OID<br> + </td> <td>BIGINT</td> </tr> <tr> <td> REAL<br> - FLOAT4</td> + FLOAT4 + </td> <td>FLOAT</td> </tr> - <tr> - <td> - FLOAT8<br> - DOUBLE PRECISION</td> - <td>DOUBLE</td> - </tr> - <tr> - <td> - NUMERIC(p, s)<br> - DECIMAL(p, s)</td> - <td>DECIMAL(p, s)</td> + <tr> + <td>NUMERIC</td> + <td>DECIMAL(38, 0)</td> </tr> <tr> - <td>BOOLEAN</td> - <td>BOOLEAN</td> + <td>DOUBLE PRECISION<br> + FLOAT8 + </td> + <td>DOUBLE</td> </tr> + <tr> + <td> CHAR[(M)]<br> + VARCHAR[(M)]<br> + CHARACTER[(M)]<br> + BPCHAR[(M)]<br> + CHARACTER VARYING[(M)] + </td> + <td>STRING</td> + </tr> <tr> - <td>DATE</td> - <td>DATE</td> + <td>TIMESTAMPTZ<br> + TIMESTAMP WITH TIME ZONE</td> + <td>ZonedTimestampType</td> </tr> <tr> - <td>TIME [(p)] [WITHOUT TIMEZONE]</td> - <td>TIME [(p)] [WITHOUT TIMEZONE]</td> + <td>INTERVAL [P]</td> + <td>BIGINT</td> </tr> <tr> - <td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td> - <td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td> + <td>INTERVAL [P]</td> + <td>STRING(when <code>debezium.interval.handling.mode</code> is set to string)</td> </tr> <tr> - <td> - CHAR(n)<br> - CHARACTER(n)<br> - VARCHAR(n)<br> - CHARACTER VARYING(n)</td> - <td>CHAR(n)</td> + <td>BYTEA</td> + <td>BYTES or STRING (when <code>debezium.binary.handling.mode</code> is set to base64 or base64-url-safe or hex)</td> </tr> <tr> <td> - TEXT</td> + JSON<br> + JSONB<br> + XML<br> + UUID<br> + POINT<br> + LTREE<br> + CITEXT<br> + INET<br> + INT4RANGE<br> + INT8RANGE<br> + NUMRANGE<br> + TSRANGE<br> + DATERANGE<br> + ENUM + </td> <td>STRING</td> </tr> - <tr> - <td>BYTEA</td> - <td>BYTES</td> - </tr> </tbody> </table> </div> -### 空间数据类型映射 -PostgreSQL通过PostGIS扩展支持空间数据类型: +### Temporal types Mapping +除了包含时区信息的 PostgreSQL 的 TIMESTAMPTZ 数据类型之外,其他时间类型如何映射取决于连接器配置属性 <code>debezium.time.precision.mode</code> 的值。以下各节将描述这些映射关系: +- debezium.time.precision.mode=adaptive +- debezium.time.precision.mode=adaptive_time_microseconds +- debezium.time.precision.mode=connect + +注意: 受限当前CDC对时间类型的支持,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect Time类型都转化为Integer类型,并精度为3,后续将进行完善。 - GEOMETRY(POINT, xx):表示使用笛卡尔坐标系的点,EPSG:xxx定义其坐标系统,适用于局部平面计算。 - GEOGRAPHY(MULTILINESTRING):以经纬度存储多条线串,基于球面模型,适合全球范围的空间分析。 +<u>debezium.time.precision.mode=adaptive</u> -前者用于小范围平面数据,后者用于大范围、需考虑地球曲率的地理数据。 +当<code>debezium.time.precision.mode</code>属性设置为默认的 adaptive(自适应)时,连接器会根据列的数据类型定义来确定字面类型和语义类型。这可以确保事件能够精确地表示数据库中的值。 +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + DATE + <td>DATE</td> + </tr> + <tr> + <td> + TIME([P]) + </td> + <td>INTEGER</td> + </tr> + <tr> + <td> + TIMESTAMP([P]) + </td> + <td>TIMESTAMP([P])</td> + </tr> + </tbody> +</table> +</div> + +### Decimal types Mapping +PostgreSQL 连接器配置属性 <code>debezium.decimal.handling.mode</code> 的设置决定了连接器如何映射十进制类型。 + +当 <code>debezium.decimal.handling.mode</code> 属性设置为 precise(精确)时,连接器会对所有 DECIMAL、NUMERIC 和 MONEY 列使用 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 逻辑类型。这是默认模式。 +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + NUMERIC[(M[,D])] + <td>DECIMAL[(M[,D])]</td> + </tr> + <tr> + <td> + NUMERIC + <td>DECIMAL(38,0)</td> + </tr> + <tr> + <td> + DECIMAL[(M[,D])] + <td>DECIMAL[(M[,D])]</td> + </tr> + <tr> + <td> + DECIMAL + <td>DECIMAL(38,0)</td> + </tr> + <tr> + <td> + MONEY[(M[,D])] + <td>DECIMAL(38,digits)(schema 参数 scale 包含一个整数,表示小数点移动了多少位。scale schema 参数由 money.fraction.digits 连接器配置属性决定。)</td> + </tr> + </tbody> +</table> +</div> + +当 <code>debezium.decimal.handling.mode</code> 属性设置为 double 时,连接器将所有 DECIMAL、NUMERIC 和 MONEY 值表示为 Java 的 double 值,并按照下表所示进行编码。 + +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + NUMERIC[(M[,D])] + <td>DOUBLE</td> + </tr> + <tr> + <td> + DECIMAL[(M[,D])] + <td>DOUBLE</td> + </tr> + <tr> + <td> + MONEY[(M[,D])] + <td>DOUBLE</td> + </tr> + </tbody> +</table> +</div> + +<code>debezium.decimal.handling.mode</code> 配置属性的最后一个可选设置是 string(字符串)。在这种情况下,连接器将 DECIMAL、NUMERIC 和 MONEY 值表示为其格式化的字符串形式,并按照下表所示进行编码。 +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + NUMERIC[(M[,D])] + <td>STRING</td> + </tr> + <tr> + <td> + DECIMAL[(M[,D])] + <td>STRING</td> + </tr> + <tr> + <td> + MONEY[(M[,D])] + <td>STRING</td> + </tr> + </tbody> +</table> +</div> + +当 <code>debezium.decimal.handling.mode</code> 的设置为 string 或 double 时,PostgreSQL 支持将 NaN(非数字)作为一个特殊值存储在 DECIMAL/NUMERIC 值中。在这种情况下,连接器会将 NaN 编码为 Double.NaN 或字符串常量 NAN。 + +### HSTORE type Mapping +PostgreSQL 连接器配置属性 <code>debezium.hstore.handling.mode</code> 的设置决定了连接器如何映射 HSTORE 值。 + +当 <code>debezium.hstore.handling.mode</code> 属性设置为 json(默认值)时,连接器将 HSTORE 值表示为 JSON 值的字符串形式,并按照下表所示进行编码。当 <code>debezium.hstore.handling.mode</code> 属性设置为 map 时,连接器对 HSTORE 值使用 MAP 模式类型。 +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + HSTORE + <td>STRING(<code>`debezium.hstore.handling.mode`=`string`</code>)</td> + </tr> + <tr> + <td> + HSTORE + <td>MAP(<code>`debezium.hstore.handling.mode`=`map`</code>)</td> + </tr> + </tbody> +</table> +</div> + +### Network address types Mapping +PostgreSQL 拥有可以存储 IPv4、IPv6 和 MAC 地址的数据类型。使用这些类型来存储网络地址比使用纯文本类型更为合适。网络地址类型提供了输入错误检查以及专用的操作符和函数。 +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + INET + <td>STRING</td> + </tr> + <tr> + <td> + CIDR + <td>STRING</td> + </tr> + <tr> + <td> + MACADDR + <td>STRING</td> + </tr> + <tr> + <td> + MACADDR8 + <td>STRING</td> + </tr> + </tbody> +</table> +</div> + +### PostGIS Types Mapping +PostgreSQL 通过 PostGIS 扩展支持空间数据类型: +``` + GEOMETRY(POINT, xx): 在笛卡尔坐标系中表示一个点,其中 EPSG:xx 定义了坐标系。它适用于局部平面计算。 + GEOGRAPHY(MULTILINESTRING): 在基于球面模型的纬度和经度上存储多条线串。它适用于全球范围的空间分析。 +``` +前者适用于小范围的平面数据,而后者适用于需要考虑地球曲率的大范围数据。 <div class="wy-table-responsive"> <table class="colwidths-auto docutils"> <thead> @@ -401,11 +622,11 @@ PostgreSQL通过PostGIS扩展支持空间数据类型: <tbody> <tr> <td>GEOMETRY(POINT, xx)</td> - <td>{"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187}</td> + <td>{"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"</td> </tr> <tr> <td>GEOGRAPHY(MULTILINESTRING)</td> - <td>{"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326}</td> + <td>{"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}</td> </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index 758002878..e52fa34bb 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -285,7 +285,6 @@ Notice: ## Data Type Mapping - <div class="wy-table-responsive"> <table class="colwidths-auto docutils"> <thead> @@ -295,6 +294,17 @@ Notice: </tr> </thead> <tbody> + <tr> + <td> + BOOLEAN <br> + BIT(1) <br> + <td>BOOLEAN</td> + </tr> + <tr> + <td> + BIT( > 1) + <td>BYTES</td> + </tr> <tr> <td> SMALLINT<br> @@ -312,69 +322,284 @@ Notice: <tr> <td> BIGINT<br> - BIGSERIAL</td> + BIGSERIAL<br> + OID<br> + </td> <td>BIGINT</td> </tr> - <tr> - <td>NUMERIC</td> - <td>DECIMAL(20, 0)</td> - </tr> <tr> <td> REAL<br> - FLOAT4</td> + FLOAT4 + </td> <td>FLOAT</td> </tr> - <tr> - <td> - FLOAT8<br> - DOUBLE PRECISION</td> - <td>DOUBLE</td> - </tr> - <tr> - <td> - NUMERIC(p, s)<br> - DECIMAL(p, s)</td> - <td>DECIMAL(p, s)</td> + <tr> + <td>NUMERIC</td> + <td>DECIMAL(38, 0)</td> </tr> <tr> - <td>BOOLEAN</td> - <td>BOOLEAN</td> + <td>DOUBLE PRECISION<br> + FLOAT8 + </td> + <td>DOUBLE</td> </tr> + <tr> + <td> CHAR[(M)]<br> + VARCHAR[(M)]<br> + CHARACTER[(M)]<br> + BPCHAR[(M)]<br> + CHARACTER VARYING[(M)] + </td> + <td>STRING</td> + </tr> <tr> - <td>DATE</td> - <td>DATE</td> + <td>TIMESTAMPTZ<br> + TIMESTAMP WITH TIME ZONE</td> + <td>ZonedTimestampType</td> </tr> <tr> - <td>TIME [(p)] [WITHOUT TIMEZONE]</td> - <td>TIME [(p)] [WITHOUT TIMEZONE]</td> + <td>INTERVAL [P]</td> + <td>BIGINT</td> </tr> <tr> - <td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td> - <td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td> + <td>INTERVAL [P]</td> + <td>STRING(when <code>debezium.interval.handling.mode</code> is set to string)</td> </tr> <tr> - <td> - CHAR(n)<br> - CHARACTER(n)<br> - VARCHAR(n)<br> - CHARACTER VARYING(n)</td> - <td>CHAR(n)</td> + <td>BYTEA</td> + <td>BYTES or STRING (when <code>debezium.binary.handling.mode</code> is set to base64 or base64-url-safe or hex)</td> </tr> <tr> <td> - TEXT</td> + JSON<br> + JSONB<br> + XML<br> + UUID<br> + POINT<br> + LTREE<br> + CITEXT<br> + INET<br> + INT4RANGE<br> + INT8RANGE<br> + NUMRANGE<br> + TSRANGE<br> + DATERANGE<br> + ENUM + </td> <td>STRING</td> </tr> - <tr> - <td>BYTEA</td> - <td>BYTES</td> - </tr> </tbody> </table> </div> -### Postgres Spatial Data Types Mapping +### Temporal types Mapping +Other than PostgreSQL’s TIMESTAMPTZ data types, which contain time zone information, how temporal types are mapped depends on the value of <code>debezium.the time.precision.mode</code> connector configuration property. The following sections describe these mappings: +- debezium.time.precision.mode=adaptive +- debezium.time.precision.mode=adaptive_time_microseconds +- debezium.time.precision.mode=connect + +Note: Due to current CDC limitations in supporting time types, when <code>debezium.time.precision.mode</code> is set to "adaptive", "adaptive_time_microseconds", or when using Connect time types, all time values are converted to the Integer type with a precision of 3. This will be improved in future updates. + +<u>debezium.time.precision.mode=adaptive</u> + +When the <code>debezium.time.precision.mode</code> property is set to adaptive, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that events exactly represent the values in the database. +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + DATE + <td>DATE</td> + </tr> + <tr> + <td> + TIME([P]) + </td> + <td>INTEGER</td> + </tr> + <tr> + <td> + TIMESTAMP([P]) + </td> + <td>TIMESTAMP([P])</td> + </tr> + </tbody> +</table> +</div> + +### Decimal types Mapping +The setting of the PostgreSQL connector configuration property <code>debezium.decimal.handling.mode</code> determines how the connector maps decimal types. + +When the <code>debezium.decimal.handling.mode</code> property is set to precise, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL, NUMERIC and MONEY columns. This is the default mode. +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + NUMERIC[(M[,D])] + <td>DECIMAL[(M[,D])]</td> + </tr> + <tr> + <td> + NUMERIC + <td>DECIMAL(38,0)</td> + </tr> + <tr> + <td> + DECIMAL[(M[,D])] + <td>DECIMAL[(M[,D])]</td> + </tr> + <tr> + <td> + DECIMAL + <td>DECIMAL(38,0)</td> + </tr> + <tr> + <td> + MONEY[(M[,D])] + <td>DECIMAL(38,digits)(The scale schema parameter contains an integer representing how many digits the decimal point was shifted. The scale schema parameter is determined by the money.fraction.digits connector configuration property.)</td> + </tr> + </tbody> +</table> +</div> + +When the <code>debezium.decimal.handling.mode</code> property is set to double, the connector represents all DECIMAL, NUMERIC and MONEY values as Java double values and encodes them as shown in the following table. + +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + NUMERIC[(M[,D])] + <td>DOUBLE</td> + </tr> + <tr> + <td> + DECIMAL[(M[,D])] + <td>DOUBLE</td> + </tr> + <tr> + <td> + MONEY[(M[,D])] + <td>DOUBLE</td> + </tr> + </tbody> +</table> +</div> + +The last possible setting for the <code>debezium.decimal.handling.mode</code> configuration property is string. In this case, the connector represents DECIMAL, NUMERIC and MONEY values as their formatted string representation, and encodes them as shown in the following table. +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + NUMERIC[(M[,D])] + <td>STRING</td> + </tr> + <tr> + <td> + DECIMAL[(M[,D])] + <td>STRING</td> + </tr> + <tr> + <td> + MONEY[(M[,D])] + <td>STRING</td> + </tr> + </tbody> +</table> +</div> + +PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of <code>debezium.decimal.handling.mode</code> is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN. + +### HSTORE type Mapping +The setting of the PostgreSQL connector configuration property <code>debezium.hstore.handling.mode</code> determines how the connector maps HSTORE values. + +When the <code>debezium.hstore.handling.mode</code> property is set to json (the default), the connector represents HSTORE values as string representations of JSON values and encodes them as shown in the following table. When the <code>debezium.hstore.handling.mode</code> property is set to map, the connector uses the MAP schema type for HSTORE values. +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + HSTORE + <td>STRING(<code>`debezium.hstore.handling.mode`=`string`</code>)</td> + </tr> + <tr> + <td> + HSTORE + <td>MAP(<code>`debezium.hstore.handling.mode`=`map`</code>)</td> + </tr> + </tbody> +</table> +</div> + +### Network address types Mapping +PostgreSQL has data types that can store IPv4, IPv6, and MAC addresses. It is better to use these types instead of plain text types to store network addresses. Network address types offer input error checking and specialized operators and functions. +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + INET + <td>STRING</td> + </tr> + <tr> + <td> + CIDR + <td>STRING</td> + </tr> + <tr> + <td> + MACADDR + <td>STRING</td> + </tr> + <tr> + <td> + MACADDR8 + <td>STRING</td> + </tr> + </tbody> +</table> +</div> + +### PostGIS Types Mapping PostgreSQL supports spatial data types through the PostGIS extension: ``` GEOMETRY(POINT, xx): Represents a point in a Cartesian coordinate system, with EPSG:xx defining the coordinate system. It is suitable for local planar calculations. @@ -392,11 +617,11 @@ The former is used for small-area planar data, while the latter is used for larg <tbody> <tr> <td>GEOMETRY(POINT, xx)</td> - <td>{"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187}</td> + <td>{"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"</td> </tr> <tr> <td>GEOGRAPHY(MULTILINESTRING)</td> - <td>{"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326}</td> + <td>{"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}</td> </tr> </tbody> </table> diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index 560ef72d2..a37c35c52 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -30,11 +30,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.data.Envelope; import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; -import io.debezium.util.HexConverter; +import io.debezium.data.geometry.Point; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.io.WKBReader; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -48,8 +51,6 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem private static final long serialVersionUID = 1L; private List<PostgreSQLReadableMetadata> readableMetadataList; - public static final String SRID = "srid"; - public static final String HEXEWKB = "hexewkb"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) { @@ -111,15 +112,37 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem protected Object convertToString(Object dbzObj, Schema schema) { // the Geometry datatype in PostgreSQL will be converted to // a String with Json format - if (Geometry.LOGICAL_NAME.equals(schema.name()) + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geometry.LOGICAL_NAME.equals(schema.name()) || Geography.LOGICAL_NAME.equals(schema.name())) { try { Struct geometryStruct = (Struct) dbzObj; byte[] wkb = geometryStruct.getBytes("wkb"); - Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32(SRID)); - Map<String, Object> geometryInfo = new HashMap<>(2); - geometryInfo.put(HEXEWKB, HexConverter.convertToHexString(wkb)); - geometryInfo.put(SRID, srid.orElse(0)); + + WKBReader wkbReader = new WKBReader(); + org.locationtech.jts.geom.Geometry jtsGeom = wkbReader.read(wkb); + + Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32("srid")); + Map<String, Object> geometryInfo = new HashMap<>(); + String geometryType = jtsGeom.getGeometryType(); + geometryInfo.put("type", geometryType); + + if (geometryType.equals("GeometryCollection")) { + geometryInfo.put("geometries", jtsGeom.toText()); + } else { + Coordinate[] coordinates = jtsGeom.getCoordinates(); + List<double[]> coordinateList = new ArrayList<>(); + if (coordinates != null) { + for (Coordinate coordinate : coordinates) { + coordinateList.add(new double[] {coordinate.x, coordinate.y}); + geometryInfo.put( + "coordinates", new double[] {coordinate.x, coordinate.y}); + } + } + geometryInfo.put( + "coordinates", OBJECT_MAPPER.writeValueAsString(coordinateList)); + } + geometryInfo.put("srid", srid.orElse(0)); return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo)); } catch (Exception e) { throw new IllegalArgumentException( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java index e82e5e8ef..90a669780 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference; import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; import org.apache.kafka.connect.data.Schema; /** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */ @@ -35,7 +36,8 @@ public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInfer protected DataType inferStruct(Object value, Schema schema) { // the Geometry datatype in PostgresSQL will be converted to // a String with Json format - if (Geography.LOGICAL_NAME.equals(schema.name()) + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geography.LOGICAL_NAME.equals(schema.name()) || Geometry.LOGICAL_NAME.equals(schema.name())) { return DataTypes.STRING(); } else { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java index d2200531c..0f3b05806 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java @@ -23,9 +23,11 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresObjectUtils; import io.debezium.connector.postgresql.PostgresSchema; import io.debezium.connector.postgresql.PostgresTopicSelector; +import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Table; @@ -165,16 +167,18 @@ public class PostgresSchemaUtils { topicSelector, valueConverterBuilder.build(jdbc.getTypeRegistry())); Table tableSchema = postgresSchema.tableFor(tableId); - return toSchema(tableSchema); + return toSchema( + tableSchema, sourceConfig.getDbzConnectorConfig(), jdbc.getTypeRegistry()); } catch (SQLException e) { throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e); } } - public static Schema toSchema(Table table) { + public static Schema toSchema( + Table table, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) { List<Column> columns = table.columns().stream() - .map(PostgresSchemaUtils::toColumn) + .map(column -> toColumn(column, dbzConfig, typeRegistry)) .collect(Collectors.toList()); return Schema.newBuilder() @@ -184,16 +188,21 @@ public class PostgresSchemaUtils { .build(); } - public static Column toColumn(io.debezium.relational.Column column) { + public static Column toColumn( + io.debezium.relational.Column column, + PostgresConnectorConfig dbzConfig, + TypeRegistry typeRegistry) { if (column.defaultValueExpression().isPresent()) { return Column.physicalColumn( column.name(), - PostgresTypeUtils.fromDbzColumn(column), + PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry), column.comment(), column.defaultValueExpression().get()); } else { return Column.physicalColumn( - column.name(), PostgresTypeUtils.fromDbzColumn(column), column.comment()); + column.name(), + PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry), + column.comment()); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java index 8af4057f9..0465563fb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java @@ -22,52 +22,24 @@ import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.table.types.logical.DecimalType; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.postgresql.PgOid; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.MONEY_FRACTION_DIGITS; + /** A utility class for converting Postgres types to Flink types. */ public class PostgresTypeUtils { - private static final String PG_SMALLSERIAL = "smallserial"; - private static final String PG_SERIAL = "serial"; - private static final String PG_BIGSERIAL = "bigserial"; - private static final String PG_BYTEA = "bytea"; - private static final String PG_BYTEA_ARRAY = "_bytea"; - private static final String PG_SMALLINT = "int2"; - private static final String PG_SMALLINT_ARRAY = "_int2"; - private static final String PG_INTEGER = "int4"; - private static final String PG_INTEGER_ARRAY = "_int4"; - private static final String PG_BIGINT = "int8"; - private static final String PG_BIGINT_ARRAY = "_int8"; - private static final String PG_REAL = "float4"; - private static final String PG_REAL_ARRAY = "_float4"; - private static final String PG_DOUBLE_PRECISION = "float8"; - private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; - private static final String PG_NUMERIC = "numeric"; - private static final String PG_NUMERIC_ARRAY = "_numeric"; - private static final String PG_BOOLEAN = "bool"; - private static final String PG_BOOLEAN_ARRAY = "_bool"; - private static final String PG_TIMESTAMP = "timestamp"; - private static final String PG_TIMESTAMP_ARRAY = "_timestamp"; - private static final String PG_TIMESTAMPTZ = "timestamptz"; - private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; - private static final String PG_DATE = "date"; - private static final String PG_DATE_ARRAY = "_date"; - private static final String PG_TIME = "time"; - private static final String PG_TIME_ARRAY = "_time"; - private static final String PG_TEXT = "text"; - private static final String PG_TEXT_ARRAY = "_text"; - private static final String PG_CHAR = "bpchar"; - private static final String PG_CHAR_ARRAY = "_bpchar"; - private static final String PG_CHARACTER = "character"; - private static final String PG_CHARACTER_ARRAY = "_character"; - private static final String PG_CHARACTER_VARYING = "varchar"; - private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; - private static final String PG_UUID = "uuid"; - private static final String PG_GEOMETRY = "geometry"; - private static final String PG_GEOGRAPHY = "geography"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column) { - DataType dataType = convertFromColumn(column); + public static DataType fromDbzColumn( + Column column, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) { + DataType dataType = convertFromColumn(column, dbzConfig, typeRegistry); if (column.isOptional()) { return dataType; } else { @@ -79,92 +51,258 @@ public class PostgresTypeUtils { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column) { - String typeName = column.typeName(); + private static DataType convertFromColumn( + Column column, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) { + int nativeType = column.nativeType(); int precision = column.length(); int scale = column.scale().orElse(0); - switch (typeName) { - case PG_BOOLEAN: + PostgresConnectorConfig.IntervalHandlingMode intervalHandlingMode = + PostgresConnectorConfig.IntervalHandlingMode.parse( + dbzConfig + .getConfig() + .getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE)); + + PostgresConnectorConfig.BinaryHandlingMode binaryHandlingMode = + dbzConfig.binaryHandlingMode(); + + TemporalPrecisionMode temporalPrecisionMode = dbzConfig.getTemporalPrecisionMode(); + + JdbcValueConverters.DecimalMode decimalMode = + dbzConfig.getDecimalMode() != null + ? dbzConfig.getDecimalMode() + : JdbcValueConverters.DecimalMode.PRECISE; + + PostgresConnectorConfig.HStoreHandlingMode hStoreHandlingMode = + PostgresConnectorConfig.HStoreHandlingMode.parse( + dbzConfig + .getConfig() + .getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE)); + + switch (nativeType) { + case PgOid.BOOL: return DataTypes.BOOLEAN(); - case PG_BOOLEAN_ARRAY: + case PgOid.BIT: + case PgOid.VARBIT: + if (precision == 1) { + return DataTypes.BOOLEAN(); + } else { + return DataTypes.BINARY(precision); + } + case PgOid.BOOL_ARRAY: return DataTypes.ARRAY(DataTypes.BOOLEAN()); - case PG_BYTEA: - return DataTypes.BYTES(); - case PG_BYTEA_ARRAY: - return DataTypes.ARRAY(DataTypes.BYTES()); - case PG_SMALLINT: - case PG_SMALLSERIAL: + case PgOid.BYTEA: + return handleBinaryWithBinaryMode(binaryHandlingMode); + case PgOid.BYTEA_ARRAY: + return DataTypes.ARRAY(handleBinaryWithBinaryMode(binaryHandlingMode)); + case PgOid.INT2: return DataTypes.SMALLINT(); - case PG_SMALLINT_ARRAY: + case PgOid.INT2_ARRAY: return DataTypes.ARRAY(DataTypes.SMALLINT()); - case PG_INTEGER: - case PG_SERIAL: + case PgOid.INT4: return DataTypes.INT(); - case PG_INTEGER_ARRAY: + case PgOid.INT4_ARRAY: return DataTypes.ARRAY(DataTypes.INT()); - case PG_BIGINT: - case PG_BIGSERIAL: + case PgOid.INT8: + case PgOid.OID: return DataTypes.BIGINT(); - case PG_BIGINT_ARRAY: + case PgOid.INTERVAL: + return handleIntervalWithIntervalHandlingMode(intervalHandlingMode); + case PgOid.INTERVAL_ARRAY: + return DataTypes.ARRAY( + handleIntervalWithIntervalHandlingMode(intervalHandlingMode)); + case PgOid.INT8_ARRAY: return DataTypes.ARRAY(DataTypes.BIGINT()); - case PG_REAL: + case PgOid.FLOAT4: return DataTypes.FLOAT(); - case PG_REAL_ARRAY: + case PgOid.FLOAT4_ARRAY: return DataTypes.ARRAY(DataTypes.FLOAT()); - case PG_DOUBLE_PRECISION: + case PgOid.FLOAT8: return DataTypes.DOUBLE(); - case PG_DOUBLE_PRECISION_ARRAY: + case PgOid.FLOAT8_ARRAY: return DataTypes.ARRAY(DataTypes.DOUBLE()); - case PG_NUMERIC: + case PgOid.NUMERIC: // see SPARK-26538: handle numeric without explicit precision and scale. - if (precision > 0) { - return DataTypes.DECIMAL(precision, scale); - } - return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0); - case PG_NUMERIC_ARRAY: + return handleNumericWithDecimalMode(precision, scale, decimalMode); + case PgOid.NUMERIC_ARRAY: // see SPARK-26538: handle numeric without explicit precision and scale. - if (precision > 0) { - return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale)); - } - return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0)); - case PG_CHAR: - case PG_CHARACTER: + return DataTypes.ARRAY(handleNumericWithDecimalMode(precision, scale, decimalMode)); + case PgOid.MONEY: + return handleMoneyWithDecimalMode( + dbzConfig.getConfig().getInteger(MONEY_FRACTION_DIGITS), decimalMode); + case PgOid.CHAR: + case PgOid.BPCHAR: return DataTypes.CHAR(precision); - case PG_CHAR_ARRAY: - case PG_CHARACTER_ARRAY: + case PgOid.CHAR_ARRAY: + case PgOid.BPCHAR_ARRAY: return DataTypes.ARRAY(DataTypes.CHAR(precision)); - case PG_CHARACTER_VARYING: + case PgOid.VARCHAR: return DataTypes.VARCHAR(precision); - case PG_CHARACTER_VARYING_ARRAY: + case PgOid.VARCHAR_ARRAY: return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); - case PG_TEXT: - case PG_GEOMETRY: - case PG_GEOGRAPHY: - case PG_UUID: + case PgOid.TEXT: + case PgOid.POINT: + case PgOid.UUID: + case PgOid.JSON: + case PgOid.JSONB: + case PgOid.XML: + case PgOid.INET_OID: + case PgOid.CIDR_OID: + case PgOid.MACADDR_OID: + case PgOid.MACADDR8_OID: + case PgOid.INT4RANGE_OID: + case PgOid.NUM_RANGE_OID: + case PgOid.INT8RANGE_OID: + case PgOid.TSRANGE_OID: + case PgOid.TSTZRANGE_OID: + case PgOid.DATERANGE_OID: return DataTypes.STRING(); - case PG_TEXT_ARRAY: + case PgOid.TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); - case PG_TIMESTAMP: - return DataTypes.TIMESTAMP(scale); - case PG_TIMESTAMP_ARRAY: - return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); - case PG_TIMESTAMPTZ: + case PgOid.TIMESTAMP: + return handleTimestampWithTemporalMode(temporalPrecisionMode, scale); + case PgOid.TIMESTAMP_ARRAY: + return DataTypes.ARRAY( + handleTimestampWithTemporalMode(temporalPrecisionMode, scale)); + case PgOid.TIMESTAMPTZ: return new ZonedTimestampType(scale); - case PG_TIMESTAMPTZ_ARRAY: + case PgOid.TIMESTAMPTZ_ARRAY: return DataTypes.ARRAY(new ZonedTimestampType(scale)); - case PG_TIME: - return DataTypes.TIME(scale); - case PG_TIME_ARRAY: - return DataTypes.ARRAY(DataTypes.TIME(scale)); - case PG_DATE: - return DataTypes.DATE(); - case PG_DATE_ARRAY: - return DataTypes.ARRAY(DataTypes.DATE()); + case PgOid.TIME: + return handleTimeWithTemporalMode(temporalPrecisionMode, scale); + case PgOid.TIME_ARRAY: + return DataTypes.ARRAY(handleTimeWithTemporalMode(temporalPrecisionMode, scale)); + case PgOid.DATE: + return handleDateWithTemporalMode(temporalPrecisionMode); + case PgOid.DATE_ARRAY: + return DataTypes.ARRAY(handleDateWithTemporalMode(temporalPrecisionMode)); default: + if (nativeType == typeRegistry.ltreeOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.geometryOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.geographyOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.citextOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.hstoreOid()) { + return handleHstoreWithHstoreMode(hStoreHandlingMode); + } else if (nativeType == typeRegistry.ltreeArrayOid()) { + return DataTypes.ARRAY(DataTypes.STRING()); + } else if (nativeType == typeRegistry.geometryArrayOid()) { + return DataTypes.ARRAY(DataTypes.STRING()); + } + final PostgresType resolvedType = typeRegistry.get(nativeType); + if (resolvedType.isEnumType()) { + return DataTypes.STRING(); + } throw new UnsupportedOperationException( - String.format("Doesn't support Postgres type '%s' yet", typeName)); + String.format( + "Doesn't support Postgres type '%s', Postgres oid '%d' yet", + column.typeName(), column.nativeType())); + } + } + + public static DataType handleNumericWithDecimalMode( + int precision, int scale, JdbcValueConverters.DecimalMode mode) { + switch (mode) { + case PRECISE: + if (precision > DecimalType.DEFAULT_SCALE + && precision <= DecimalType.MAX_PRECISION) { + return DataTypes.DECIMAL(precision, scale); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, DecimalType.DEFAULT_SCALE); + case DOUBLE: + return DataTypes.DOUBLE(); + case STRING: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown decimal mode: " + mode); + } + } + + public static DataType handleBinaryWithBinaryMode( + CommonConnectorConfig.BinaryHandlingMode mode) { + switch (mode) { + case BYTES: + return DataTypes.BYTES(); + case BASE64: + case HEX: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown binary mode: " + mode); + } + } + + public static DataType handleMoneyWithDecimalMode( + int moneyFractionDigits, JdbcValueConverters.DecimalMode mode) { + switch (mode) { + case PRECISE: + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, moneyFractionDigits); + case DOUBLE: + return DataTypes.DOUBLE(); + case STRING: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown decimal mode: " + mode); + } + } + + public static DataType handleIntervalWithIntervalHandlingMode( + PostgresConnectorConfig.IntervalHandlingMode mode) { + switch (mode) { + case NUMERIC: + return DataTypes.BIGINT(); + case STRING: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown interval mode: " + mode); + } + } + + public static DataType handleDateWithTemporalMode(TemporalPrecisionMode mode) { + switch (mode) { + case ADAPTIVE: + case ADAPTIVE_TIME_MICROSECONDS: + case CONNECT: + return DataTypes.DATE(); + default: + throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); + } + } + + public static DataType handleTimeWithTemporalMode(TemporalPrecisionMode mode, int scale) { + switch (mode) { + case ADAPTIVE: + case ADAPTIVE_TIME_MICROSECONDS: + case CONNECT: + return DataTypes.INT(); + default: + throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); + } + } + + public static DataType handleTimestampWithTemporalMode(TemporalPrecisionMode mode, int scale) { + switch (mode) { + case ADAPTIVE: + case ADAPTIVE_TIME_MICROSECONDS: + case CONNECT: + return DataTypes.TIMESTAMP(scale); + default: + throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); + } + } + + public static DataType handleHstoreWithHstoreMode( + PostgresConnectorConfig.HStoreHandlingMode mode) { + switch (mode) { + case JSON: + return DataTypes.STRING(); + case MAP: + return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + default: + throw new IllegalArgumentException("Unknown hstore mode: " + mode); } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 3dc04f59d..9da05181f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -21,8 +21,10 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryMapData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; @@ -59,10 +61,16 @@ import java.math.BigDecimal; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.stream.Stream; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; @@ -193,15 +201,506 @@ public class PostgresFullTypesITCase extends PostgresTestBase { 64822000, DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0), BinaryStringData.fromString( - "{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187}"), + "{\"coordinates\":\"[[174.9479,-36.7208]]\",\"type\":\"Point\",\"srid\":3187}"), BinaryStringData.fromString( - "{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}") + "{\"coordinates\":\"[[169.1321,-44.7032],[167.8974,-44.6414]]\",\"type\":\"MultiLineString\",\"srid\":4326}"), + true, + new byte[] {10}, + new byte[] {42}, + BinaryStringData.fromString("abc"), + 1209600000000L, + BinaryStringData.fromString( + "{\"order_id\": 10248, \"product\": \"Notebook\", \"quantity\": 5}"), + BinaryStringData.fromString( + "{\"product\": \"Pen\", \"order_id\": 10249, \"quantity\": 10}"), + BinaryStringData.fromString( + "<user>\n" + + "<id>123</id>\n" + + "<name>Alice</name>\n" + + "<email>al...@example.com</email>\n" + + "<preferences>\n" + + "<theme>dark</theme>\n" + + "<notifications>true</notifications>\n" + + "</preferences>\n" + + "</user>"), + BinaryStringData.fromString( + "{\"coordinates\":\"[[3.456,7.89]]\",\"type\":\"Point\",\"srid\":0}"), + BinaryStringData.fromString("foo.bar.baz"), + BinaryStringData.fromString("JohnDoe"), + BinaryStringData.fromString("{\"size\":\"L\",\"color\":\"blue\"}"), + BinaryStringData.fromString("192.168.1.1"), + BinaryStringData.fromString("[1,10)"), + BinaryStringData.fromString("[1000000000,5000000000)"), + BinaryStringData.fromString("[5.5,20.75)"), + BinaryStringData.fromString( + "[\"2023-08-01 08:00:00\",\"2023-08-01 12:00:00\")"), + BinaryStringData.fromString("[2023-08-01,2023-08-15)"), + BinaryStringData.fromString("pending"), + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + + Assertions.assertThat(recordFields(snapshotRecord, COMMON_TYPES)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testTimeTypesWithTemporalModeAdaptive() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("time.precision.mode", "adaptive"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.time_types") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, + 18460, + 64822000, + 64822123, + 64822123, + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")), + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2020-07-17T18:00:22.123456")), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHandlingDecimalModePrecise() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "precise"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_table") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, + DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + DecimalData.fromBigDecimal(new BigDecimal("67.8912"), 8, 4), + DecimalData.fromBigDecimal(new BigDecimal("987.65"), 5, 2), + DecimalData.fromBigDecimal(new BigDecimal("12.3"), 3, 1), + DecimalData.fromBigDecimal(new BigDecimal("100.50"), 38, 2), + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_PRECISE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHandlingDecimalModeDouble() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "double"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_table") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, 123.45, 67.8912, 987.65, 12.3, 100.50, + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_DOUBLE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHandlingDecimalModeString() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "string"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_table") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, + BinaryStringData.fromString("123.45"), + BinaryStringData.fromString("67.8912"), + BinaryStringData.fromString("987.65"), + BinaryStringData.fromString("12.3"), + BinaryStringData.fromString("100.5"), + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_STRING)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testZeroHandlingDecimalModePrecise() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "precise"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_zero") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, + DecimalData.fromBigDecimal(new BigDecimal("99999999.99"), 10, 2), + DecimalData.fromBigDecimal(new BigDecimal("9999.9999"), 8, 4), + null, + null, + null, + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_PRECISE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testZeroHandlingDecimalModeDouble() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "double"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_zero") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, 99999999.99, 9999.9999, null, null, null, + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_DOUBLE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testZeroHandlingDecimalModeString() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "string"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_zero") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, + BinaryStringData.fromString("99999999.99"), + BinaryStringData.fromString("9999.9999"), + null, + null, + null, }; List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_STRING)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHstoreHandlingModeMap() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("hstore.handling.mode", "map"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.hstore_types") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Map<BinaryStringData, BinaryStringData> expectedMap = new HashMap<>(); + expectedMap.put(BinaryStringData.fromString("a"), BinaryStringData.fromString("1")); + expectedMap.put(BinaryStringData.fromString("b"), BinaryStringData.fromString("2")); + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Object[] snapshotObjects = recordFields(snapshotRecord, HSTORE_TYPES_WITH_ADAPTIVE); + Map<String, String> snapshotMap = + (Map<String, String>) + ((BinaryMapData) snapshotObjects[1]) + .toJavaMap(DataTypes.STRING(), DataTypes.STRING()); + Assertions.assertThat(expectedMap).isEqualTo(snapshotMap); + } + + @Test + public void testJsonTypes() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("hstore.handling.mode", "map"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.json_types") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); - Assertions.assertThat(recordFields(snapshotRecord, PG_TYPES)).isEqualTo(expectedSnapshot); + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, + BinaryStringData.fromString("{\"key1\":\"value1\"}"), + BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"), + BinaryStringData.fromString( + "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"), + BinaryStringData.fromBytes("{\"key1\": \"value1\"}".getBytes()), + BinaryStringData.fromBytes( + "{\"key1\": \"value1\", \"key2\": \"value2\"}".getBytes()), + BinaryStringData.fromBytes( + "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]" + .getBytes()), + 1L + }; + + List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, JSON_TYPES)).isEqualTo(expectedSnapshot); } private <T> Tuple2<List<T>, List<CreateTableEvent>> fetchResultsAndCreateTableEvent( @@ -237,7 +736,11 @@ public class PostgresFullTypesITCase extends PostgresTestBase { return fields; } - private static final RowType PG_TYPES = + private Instant toInstant(String ts) { + return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC+8")).toInstant(); + } + + private static final RowType COMMON_TYPES = RowType.of( DataTypes.INT(), DataTypes.BYTES(), @@ -259,5 +762,80 @@ public class PostgresFullTypesITCase extends PostgresTestBase { DataTypes.TIME(0), DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE), DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.BOOLEAN(), + DataTypes.BINARY(8), + DataTypes.BINARY(20), + DataTypes.CHAR(3), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), DataTypes.STRING()); + + private static final RowType TYPES_WITH_PRECISE = + RowType.of( + DataTypes.INT(), + DataTypes.DECIMAL(10, 2), + DataTypes.DECIMAL(8, 4), + DataTypes.DECIMAL(5, 2), + DataTypes.DECIMAL(3, 1), + DataTypes.DECIMAL(38, 2)); + + private static final RowType TYPES_WITH_DOUBLE = + RowType.of( + DataTypes.INT(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE()); + + private static final RowType TYPES_WITH_STRING = + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING()); + + private static final RowType TIME_TYPES_WITH_ADAPTIVE = + RowType.of( + DataTypes.INT(), + DataTypes.DATE(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(3), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_LTZ(0)); + + private static final RowType HSTORE_TYPES_WITH_ADAPTIVE = + RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); + + private static final RowType JSON_TYPES = + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.BIGINT()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql index 4790b0f49..9ac9d95bb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql @@ -21,7 +21,11 @@ DROP SCHEMA IF EXISTS inventory CASCADE; CREATE SCHEMA inventory; -- postgis is installed into public schema SET search_path TO inventory, public; +CREATE EXTENSION IF NOT EXISTS ltree; +CREATE EXTENSION IF NOT EXISTS citext; +CREATE EXTENSION IF NOT EXISTS hstore; +CREATE TYPE status AS ENUM ('pending', 'approved', 'rejected'); CREATE TABLE full_types ( @@ -46,6 +50,25 @@ CREATE TABLE full_types default_numeric_c NUMERIC, geometry_c GEOMETRY(POINT, 3187), geography_c GEOGRAPHY(MULTILINESTRING), + bit_c BIT(1), + bit_fixed_c BIT(8), + bit_varying_c BIT VARYING(20), + bpchar_c BPCHAR(3), + duration_c INTERVAL, + json_c JSON, + jsonb_c JSONB, + xml_C XML, + location POINT, + ltree_c LTREE, + username CITEXT NOT NULL, + attributes HSTORE, + inet_c INET, + int4range_c INT4RANGE, + int8range_c INT8RANGE, + numrange_c NUMRANGE, + tsrange_c TSRANGE, + daterange_c DATERANGE, + status status NOT NULL, PRIMARY KEY (id) ); @@ -56,4 +79,78 @@ INSERT INTO inventory.full_types VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17', '18:00:22', 500,'SRID=3187;POINT(174.9479 -36.7208)'::geometry, - 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography); \ No newline at end of file + 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography,B'1',B'00001010',B'00101010','abc','2 weeks','{"order_id": 10248, "product": "Notebook", "quantity": 5}','{"order_id": 10249, "product": "Pen", "quantity": 10}'::jsonb,'<user> + <id>123</id> + <name>Alice</name> + <email>al...@example.com</email> + <preferences> + <theme>dark</theme> + <notifications>true</notifications> + </preferences> + </user>','(3.456,7.890)'::point,'foo.bar.baz','JohnDoe','color => "blue", size => "L"','192.168.1.1'::inet,'[1, 10)'::int4range,'[1000000000, 5000000000)'::int8range,'[5.5, 20.75)'::numrange, + '["2023-08-01 08:00:00", "2023-08-01 12:00:00")','["2023-08-01", "2023-08-15")','pending'); + + +CREATE TABLE time_types ( + id SERIAL PRIMARY KEY, + date_c DATE, + time_c TIME(0) WITHOUT TIME ZONE, + time_3_c TIME(3) WITHOUT TIME ZONE, + time_6_c TIME(6) WITHOUT TIME ZONE, + datetime_c TIMESTAMP(0) WITHOUT TIME ZONE, + datetime3_c TIMESTAMP(3) WITHOUT TIME ZONE, + datetime6_c TIMESTAMP(6) WITHOUT TIME ZONE, + timestamp_c TIMESTAMP WITHOUT TIME ZONE, + timestamp_tz_c TIMESTAMP WITH TIME ZONE +); +ALTER TABLE inventory.time_types + REPLICA IDENTITY FULL; + +INSERT INTO time_types +VALUES (2, + '2020-07-17', + '18:00:22', + '18:00:22.123', + '18:00:22.123456', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22.123', + '2020-07-17 18:00:22.123456', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22+08:00'); + +CREATE TABLE hstore_types ( + id SERIAL PRIMARY KEY, + hstore_c HSTORE +); + +ALTER TABLE inventory.hstore_types + REPLICA IDENTITY FULL; + +INSERT INTO hstore_types +VALUES (1, 'a => 1, b => 2'); + +CREATE TABLE json_types ( + id SERIAL PRIMARY KEY, + json_c0 JSON, + json_c1 JSON, + json_c2 JSON, + jsonb_c0 JSONB, + jsonb_c1 JSONB, + jsonb_c2 JSONB, + int_c INTEGER +); + +ALTER TABLE inventory.json_types + REPLICA IDENTITY FULL; + +INSERT INTO json_types (id,json_c0, json_c1, json_c2, jsonb_c0, jsonb_c1, jsonb_c2, int_c) +VALUES + (1, + '{"key1":"value1"}', + '{"key1":"value1","key2":"value2"}', + '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]', + '{"key1":"value1"}'::jsonb, + '{"key1":"value1","key2":"value2"}'::jsonb, + '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]'::jsonb, + 1 + ); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql new file mode 100644 index 000000000..b2f18d589 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql @@ -0,0 +1,70 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: decimal_mode_test +-- ---------------------------------------------------------------------------------------------------------------- +-- Generate a number of tables to cover as many of the PG types as possible + + +DROP SCHEMA IF EXISTS test_decimal CASCADE; +CREATE SCHEMA IF NOT EXISTS test_decimal; + +SET search_path TO test_decimal; + +DROP TABLE IF EXISTS decimal_test_table; +CREATE TABLE decimal_test_table ( + id SERIAL PRIMARY KEY, + fixed_numeric NUMERIC(10,2), + fixed_decimal DECIMAL(8,4), + variable_numeric NUMERIC, + variable_decimal DECIMAL, + amount_money MONEY +); + +ALTER TABLE decimal_test_table REPLICA IDENTITY FULL; + +INSERT INTO decimal_test_table ( + id, + fixed_numeric, + fixed_decimal, + variable_numeric, + variable_decimal, + amount_money +) VALUES +(1, 123.45, 67.8912, 987.65, 12.3, '100.50'::money); + + +DROP TABLE IF EXISTS decimal_test_zero; +CREATE TABLE decimal_test_zero ( + id SERIAL PRIMARY KEY, + fixed_numeric NUMERIC(10,2), + fixed_decimal DECIMAL(8,4), + variable_numeric NUMERIC, + variable_decimal DECIMAL, + amount_money MONEY +); + +ALTER TABLE decimal_test_zero REPLICA IDENTITY FULL; + +INSERT INTO decimal_test_zero ( + id, + fixed_numeric, + fixed_decimal, + variable_numeric, + variable_decimal, + amount_money +) VALUES + (2, 99999999.99, 9999.9999, null, null, null); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index 338f03d23..538e8ceef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.debezium.event; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.GenericMapData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; @@ -31,6 +32,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.types.DataField; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; @@ -222,8 +224,17 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve return convertToRecord((RowType) type, dbzObj, schema); } }; - case ARRAY: case MAP: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + return convertToMap(dbzObj, schema); + } + }; + case ARRAY: default: throw new UnsupportedOperationException("Unsupported type: " + type); } @@ -426,6 +437,41 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve } } + protected Object convertToMap(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + + // Obtain the schema for the keys and values of a Map" + Schema keySchema = schema.keySchema(); + Schema valueSchema = schema.valueSchema(); + + // Infer the data types of keys and values + DataType keyType = + keySchema != null + ? schemaDataTypeInference.infer(null, keySchema) + : DataTypes.STRING(); + + DataType valueType = + valueSchema != null + ? schemaDataTypeInference.infer(null, valueSchema) + : DataTypes.STRING(); + + DeserializationRuntimeConverter keyConverter = createConverter(keyType); + DeserializationRuntimeConverter valueConverter = createConverter(valueType); + + Map<?, ?> map = (Map<?, ?>) dbzObj; + Map<Object, Object> convertedMap = new java.util.HashMap<>(map.size()); + + for (Map.Entry<?, ?> entry : map.entrySet()) { + Object convertedKey = convertField(keyConverter, entry.getKey(), keySchema); + Object convertedValue = convertField(valueConverter, entry.getValue(), valueSchema); + convertedMap.put(convertedKey, convertedValue); + } + + return new GenericMapData(convertedMap); + } + private static DeserializationRuntimeConverter wrapIntoNullableConverter( DeserializationRuntimeConverter converter) { return new DeserializationRuntimeConverter() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java index 3afa7128d..06f4c9d55 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java @@ -210,6 +210,12 @@ public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference, } protected DataType inferMap(Object value, Schema schema) { - throw new UnsupportedOperationException("Unsupported type MAP"); + Schema keySchema = schema.keySchema(); + Schema valueSchema = schema.valueSchema(); + + DataType keyType = keySchema != null ? infer(null, keySchema) : DataTypes.STRING(); + DataType valueType = valueSchema != null ? infer(null, valueSchema) : DataTypes.STRING(); + + return DataTypes.MAP(keyType, valueType); } }