This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new efb1272 [Fix] fix deciaml parse (#70)
efb1272 is described below
commit efb127254906a8ee691cb0cf2a76a6ae97bfda5e
Author: wudi <[email protected]>
AuthorDate: Thu Apr 24 11:09:36 2025 +0800
[Fix] fix deciaml parse (#70)
---
.../doris/kafka/connector/converter/RecordService.java | 4 ++++
.../doris/kafka/connector/converter/TestRecordService.java | 13 ++++++++++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index 81afbd0..51b3aab 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -51,7 +51,9 @@ import org.apache.doris.kafka.connector.writer.LoadConstants;
import org.apache.doris.kafka.connector.writer.RecordBuffer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +74,8 @@ public class RecordService {
this.converter = new JsonConverter();
Map<String, Object> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
+ converterConfig.put(
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
DecimalFormat.NUMERIC.name());
this.converter.configure(converterConfig, false);
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index d9688da..da6d2af 100644
---
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -77,7 +77,7 @@ public class TestRecordService {
props.put("debezium.schema.evolution", "basic");
props.put(
"doris.topic2table.map",
-
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types");
+
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types,normal_type:normal_type");
recordService = new RecordService(new DorisOptions((Map) props));
HashMap<String, String> config = new HashMap<>();
jsonConverter.configure(config, false);
@@ -239,6 +239,17 @@ public class TestRecordService {
"{\"name\":\"doris\",\"key\":\"1\"}\n{\"name\":\"doris\",\"key\":\"2\"}", s);
}
+ @Test
+ public void processNormalAllTypes() throws IOException {
+ props.put("converter.mode", "normal");
+ recordService = new RecordService(new DorisOptions((Map) props));
+ String data =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_z_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"small_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_z
[...]
+ String target =
+
"{\"id\":1,\"tiny_c\":127,\"tiny_un_c\":255,\"tiny_un_z_c\":255,\"small_c\":32767,\"small_un_c\":65535,\"small_un_z_c\":65535,\"medium_c\":8388607,\"medium_un_c\":16777215,\"medium_un_z_c\":16777215,\"int_c\":2147483647,\"int_un_c\":4294967295,\"int_un_z_c\":4294967295,\"int11_c\":2147483647,\"big_c\":9223372036854775807,\"big_un_c\":-1,\"big_un_z_c\":-1,\"varchar_c\":\"Hello
World\",\"char_c\":\"abc\",\"real_c\":123.102,\"float_c\":123.10199737548828,\"float_un_c\":123.1
[...]
+ buildProcessStructRecord("normal_type", data, target);
+ }
+
@Test
public void processMySQlGeoStructRecord() throws IOException {
String schemaStr =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]