This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ad93f55f14 [cdc] Fix PostgreSQL DECIMAL type conversion issue (#6239)
ad93f55f14 is described below
commit ad93f55f14db0b953b46100e185cd7920a7f43eb
Author: Juntao Zhang <[email protected]>
AuthorDate: Thu Sep 11 17:16:37 2025 +0800
[cdc] Fix PostgreSQL DECIMAL type conversion issue (#6239)
---
.../paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java | 2 +-
.../paimon/flink/action/cdc/postgres/PostgresRecordParser.java | 5 +++--
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
index 80f99165e6..983b760294 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
@@ -167,7 +167,7 @@ public class DebeziumSchemaUtils {
} else if (("bytes".equals(debeziumType) && className == null)) {
// MySQL binary, varbinary, blob
transformed = new String(Base64.getDecoder().decode(rawValue));
- } else if ("bytes".equals(debeziumType) &&
decimalLogicalName().equals(className)) {
+ } else if ("bytes".equals(debeziumType) &&
className.endsWith(decimalLogicalName())) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(rawValue);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index c2565c1f2d..8c5be3b6a1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -179,7 +179,7 @@ public class PostgresRecordParser
case "string":
return DataTypes.STRING();
case "bytes":
- if (decimalLogicalName().equals(field.name())) {
+ if (field.name() != null &&
field.name().endsWith(decimalLogicalName())) {
int precision =
field.parameters().get("connect.decimal.precision").asInt();
int scale = field.parameters().get("scale").asInt();
return DataTypes.DECIMAL(precision, scale);
@@ -270,7 +270,8 @@ public class PostgresRecordParser
} else if (("bytes".equals(postgresSqlType) && className == null))
{
// binary, varbinary
newValue = new String(Base64.getDecoder().decode(oldValue));
- } else if ("bytes".equals(postgresSqlType) &&
decimalLogicalName().equals(className)) {
+ } else if ("bytes".equals(postgresSqlType)
+ && className.endsWith(decimalLogicalName())) {
// numeric, decimal
try {
new BigDecimal(oldValue);