This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b3a5d3c59 [Improve][format] Using number format for Decimal type in
`seatunnel-format-compatible-debezium-json` (#5803)
7b3a5d3c59 is described below
commit 7b3a5d3c5919b96071f96a3379343cee204e1180
Author: hailin0 <[email protected]>
AuthorDate: Wed Nov 8 19:20:50 2023 +0800
[Improve][format] Using number format for Decimal type in
`seatunnel-format-compatible-debezium-json` (#5803)
---
.../debezium/json/DebeziumJsonConverter.java | 24 +++++----
.../debezium/json/TestDebeziumJsonConverter.java | 62 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
index fe96d807e9..983aca5f8d 100644
---
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.format.compatible.debezium.json;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
@@ -30,7 +31,8 @@ import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
@RequiredArgsConstructor
public class DebeziumJsonConverter implements Serializable {
@@ -68,10 +70,12 @@ public class DebeziumJsonConverter implements Serializable {
synchronized (this) {
if (keyConverter == null) {
keyConverter = new JsonConverter();
- keyConverter.configure(
- Collections.singletonMap(
- JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
keySchemaEnable),
- true);
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
keySchemaEnable);
+ configs.put(
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
+ DecimalFormat.NUMERIC.name());
+ keyConverter.configure(configs, true);
keyConverterMethod =
ReflectionUtils.getDeclaredMethod(
JsonConverter.class,
@@ -88,10 +92,12 @@ public class DebeziumJsonConverter implements Serializable {
synchronized (this) {
if (valueConverter == null) {
valueConverter = new JsonConverter();
- valueConverter.configure(
- Collections.singletonMap(
- JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
valueSchemaEnable),
- false);
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
valueSchemaEnable);
+ configs.put(
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
+ DecimalFormat.NUMERIC.name());
+ valueConverter.configure(configs, false);
valueConverterMethod =
ReflectionUtils.getDeclaredMethod(
JsonConverter.class,
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestDebeziumJsonConverter.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestDebeziumJsonConverter.java
new file mode 100644
index 0000000000..009cd68d25
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestDebeziumJsonConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+import java.util.Collections;
+
+public class TestDebeziumJsonConverter {
+
+ @Test
+ public void testSerializeDecimalToNumber()
+ throws InvocationTargetException, IllegalAccessException,
JsonProcessingException {
+ String key = "k";
+ String value = "v";
+ Struct keyStruct =
+ new Struct(SchemaBuilder.struct().field(key,
Decimal.builder(2).build()).build());
+ keyStruct.put(key, BigDecimal.valueOf(1101, 2));
+ Struct valueStruct =
+ new Struct(SchemaBuilder.struct().field(value,
Decimal.builder(2).build()).build());
+ valueStruct.put(value, BigDecimal.valueOf(1101, 2));
+
+ SourceRecord sourceRecord =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null,
+ keyStruct.schema(),
+ keyStruct,
+ valueStruct.schema(),
+ valueStruct);
+
+ DebeziumJsonConverter converter = new DebeziumJsonConverter(false,
false);
+ Assertions.assertEquals("{\"k\":11.01}",
converter.serializeKey(sourceRecord));
+ Assertions.assertEquals("{\"v\":11.01}",
converter.serializeValue(sourceRecord));
+ }
+}