This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ad93f55f14 [cdc] Fix PostgreSQL DECIMAL type conversion issue (#6239)
ad93f55f14 is described below

commit ad93f55f14db0b953b46100e185cd7920a7f43eb
Author: Juntao Zhang <[email protected]>
AuthorDate: Thu Sep 11 17:16:37 2025 +0800

    [cdc] Fix PostgreSQL DECIMAL type conversion issue (#6239)
---
 .../paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java | 2 +-
 .../paimon/flink/action/cdc/postgres/PostgresRecordParser.java       | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
index 80f99165e6..983b760294 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
@@ -167,7 +167,7 @@ public class DebeziumSchemaUtils {
         } else if (("bytes".equals(debeziumType) && className == null)) {
             // MySQL binary, varbinary, blob
             transformed = new String(Base64.getDecoder().decode(rawValue));
-        } else if ("bytes".equals(debeziumType) && 
decimalLogicalName().equals(className)) {
+        } else if ("bytes".equals(debeziumType) && 
className.endsWith(decimalLogicalName())) {
             // MySQL numeric, fixed, decimal
             try {
                 new BigDecimal(rawValue);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index c2565c1f2d..8c5be3b6a1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -179,7 +179,7 @@ public class PostgresRecordParser
             case "string":
                 return DataTypes.STRING();
             case "bytes":
-                if (decimalLogicalName().equals(field.name())) {
+                if (field.name() != null && 
field.name().endsWith(decimalLogicalName())) {
                     int precision = 
field.parameters().get("connect.decimal.precision").asInt();
                     int scale = field.parameters().get("scale").asInt();
                     return DataTypes.DECIMAL(precision, scale);
@@ -270,7 +270,8 @@ public class PostgresRecordParser
             } else if (("bytes".equals(postgresSqlType) && className == null)) 
{
                 // binary, varbinary
                 newValue = new String(Base64.getDecoder().decode(oldValue));
-            } else if ("bytes".equals(postgresSqlType) && 
decimalLogicalName().equals(className)) {
+            } else if ("bytes".equals(postgresSqlType)
+                    && className.endsWith(decimalLogicalName())) {
                 // numeric, decimal
                 try {
                     new BigDecimal(oldValue);

Reply via email to