This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b3a5d3c59 [Improve][format] Using number format for Decimal type in 
`seatunnel-format-compatible-debezium-json` (#5803)
7b3a5d3c59 is described below

commit 7b3a5d3c5919b96071f96a3379343cee204e1180
Author: hailin0 <[email protected]>
AuthorDate: Wed Nov 8 19:20:50 2023 +0800

    [Improve][format] Using number format for Decimal type in 
`seatunnel-format-compatible-debezium-json` (#5803)
---
 .../debezium/json/DebeziumJsonConverter.java       | 24 +++++----
 .../debezium/json/TestDebeziumJsonConverter.java   | 62 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
index fe96d807e9..983aca5f8d 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.format.compatible.debezium.json;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.json.DecimalFormat;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -30,7 +31,8 @@ import lombok.RequiredArgsConstructor;
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 @RequiredArgsConstructor
 public class DebeziumJsonConverter implements Serializable {
@@ -68,10 +70,12 @@ public class DebeziumJsonConverter implements Serializable {
             synchronized (this) {
                 if (keyConverter == null) {
                     keyConverter = new JsonConverter();
-                    keyConverter.configure(
-                            Collections.singletonMap(
-                                    JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
keySchemaEnable),
-                            true);
+                    Map<String, Object> configs = new HashMap<>();
+                    configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
keySchemaEnable);
+                    configs.put(
+                            JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
+                            DecimalFormat.NUMERIC.name());
+                    keyConverter.configure(configs, true);
                     keyConverterMethod =
                             ReflectionUtils.getDeclaredMethod(
                                             JsonConverter.class,
@@ -88,10 +92,12 @@ public class DebeziumJsonConverter implements Serializable {
             synchronized (this) {
                 if (valueConverter == null) {
                     valueConverter = new JsonConverter();
-                    valueConverter.configure(
-                            Collections.singletonMap(
-                                    JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
valueSchemaEnable),
-                            false);
+                    Map<String, Object> configs = new HashMap<>();
+                    configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
valueSchemaEnable);
+                    configs.put(
+                            JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
+                            DecimalFormat.NUMERIC.name());
+                    valueConverter.configure(configs, false);
                     valueConverterMethod =
                             ReflectionUtils.getDeclaredMethod(
                                             JsonConverter.class,
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestDebeziumJsonConverter.java
 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestDebeziumJsonConverter.java
new file mode 100644
index 0000000000..009cd68d25
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestDebeziumJsonConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+import java.util.Collections;
+
+public class TestDebeziumJsonConverter {
+
+    @Test
+    public void testSerializeDecimalToNumber()
+            throws InvocationTargetException, IllegalAccessException, 
JsonProcessingException {
+        String key = "k";
+        String value = "v";
+        Struct keyStruct =
+                new Struct(SchemaBuilder.struct().field(key, 
Decimal.builder(2).build()).build());
+        keyStruct.put(key, BigDecimal.valueOf(1101, 2));
+        Struct valueStruct =
+                new Struct(SchemaBuilder.struct().field(value, 
Decimal.builder(2).build()).build());
+        valueStruct.put(value, BigDecimal.valueOf(1101, 2));
+
+        SourceRecord sourceRecord =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        null,
+                        keyStruct.schema(),
+                        keyStruct,
+                        valueStruct.schema(),
+                        valueStruct);
+
+        DebeziumJsonConverter converter = new DebeziumJsonConverter(false, 
false);
+        Assertions.assertEquals("{\"k\":11.01}", 
converter.serializeKey(sourceRecord));
+        Assertions.assertEquals("{\"v\":11.01}", 
converter.serializeValue(sourceRecord));
+    }
+}

Reply via email to