This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 28e7e97  fix decimal(65,18) loss of precision (#105)
28e7e97 is described below

commit 28e7e9714d4edf5e479949098c074382cafb8828
Author: DongLiang-0 <[email protected]>
AuthorDate: Wed Feb 22 22:52:21 2023 +0800

    fix decimal(65,18) loss of precision (#105)
    
    * fix decimal(65,18) loss of precision
    * Fix decimal 0 missing problem
---
 .../doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index bd685ef..60fca85 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -19,8 +19,10 @@ package org.apache.doris.flink.sink.writer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
@@ -74,12 +76,16 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         this.database = tableInfo[0];
         this.table = tableInfo[1];
         this.sourceTableName = sourceTableName;
+        // Prevent loss of decimal data precision
+        
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+        JsonNodeFactory jsonNodeFactory = 
JsonNodeFactory.withExactBigDecimals(true);
+        this.objectMapper.setNodeFactory(jsonNodeFactory);
     }
 
     @Override
     public byte[] serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
-        JsonNode recordRoot = objectMapper.readTree(record);
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             //schema change ddl


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to