This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 03faf5b4 [improve](testcase) increase mongo cdc UT (#393)
03faf5b4 is described below

commit 03faf5b4f52aa1ffad168ef16b23a5cd803e432f
Author: bingquanzhao <[email protected]>
AuthorDate: Tue Jun 4 20:40:24 2024 +0800

    [improve](testcase) increase mongo cdc UT (#393)
---
 .../flink/tools/cdc/mongodb/MongoDBSchema.java     |   4 +-
 .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java |  47 +++++++
 .../flink/tools/cdc/mongodb/MongoDBTypeTest.java   | 139 +++++++++++++++++++++
 .../tools/cdc/mongodb/MongoDateConverterTest.java  |  32 +++++
 .../mongodb/MongoParsingProcessFunctionTest.java   |  35 ++++++
 5 files changed, 256 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 2c2e1b48..41752c5e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.tools.cdc.mongodb;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 import org.apache.doris.flink.catalog.doris.DorisType;
@@ -63,7 +64,8 @@ public class MongoDBSchema extends SourceSchema {
         return existingField != null && 
existingField.getTypeString().startsWith(DorisType.DECIMAL);
     }
 
-    private String replaceDecimalTypeIfNeeded(String fieldName, String 
newDorisType) {
+    @VisibleForTesting
+    protected String replaceDecimalTypeIfNeeded(String fieldName, String 
newDorisType) {
         FieldSchema existingField = fields.get(fieldName);
         if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) {
             Tuple2<Integer, Integer> existingPrecisionAndScale =
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
new file mode 100644
index 00000000..57f7f470
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.bson.Document;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoDBSchemaTest {
+
+    @Test
+    public void convertToDorisType() {}
+
+    @Test
+    public void getCdcTableName() throws Exception {
+        MongoDBSchema mongoDBSchema =
+                new MongoDBSchema(new ArrayList<Document>(), "db_TEST", 
"test_table", "");
+        assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName());
+    }
+
+    @Test
+    public void replaceDecimalTypeIfNeeded() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", 1234567.666666));
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", 
"DECIMALV3(12,8)");
+        assertEquals("DECIMAL(15,8)", d);
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
new file mode 100644
index 00000000..ee511ce2
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.bson.BsonArray;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class MongoDBTypeTest {
+
+    @Test
+    public void toDorisType() {
+        assertEquals(DorisType.INT, MongoDBType.toDorisType(new Integer(123)));
+        assertEquals(DorisType.DATETIME_V2 + "(3)", 
MongoDBType.toDorisType(new Date()));
+        assertEquals(DorisType.BIGINT, MongoDBType.toDorisType(new 
Long(1234567891)));
+        assertEquals("DECIMALV3(6,2)", MongoDBType.toDorisType(new 
Double("1234.56")));
+        assertEquals(DorisType.BOOLEAN, MongoDBType.toDorisType(new 
Boolean(true)));
+        assertEquals(DorisType.STRING, MongoDBType.toDorisType("string"));
+        assertEquals(
+                DorisType.VARCHAR + "(30)",
+                MongoDBType.toDorisType(new 
ObjectId("66583533791a67a6f8c5a339")));
+        assertEquals(DorisType.ARRAY, MongoDBType.toDorisType(new 
BsonArray()));
+        assertEquals(
+                "DECIMALV3(10,5)",
+                MongoDBType.toDorisType(new Decimal128(new 
BigDecimal("12345.55555"))));
+    }
+
+    @Test
+    public void jsonNodeToDorisType() {
+        assertEquals(DorisType.INT, MongoDBType.jsonNodeToDorisType(new 
IntNode(1234)));
+        assertEquals(DorisType.STRING, MongoDBType.jsonNodeToDorisType(new 
TextNode("text")));
+        assertEquals(DorisType.BIGINT, MongoDBType.jsonNodeToDorisType(new 
LongNode(1234568948)));
+        assertEquals(DorisType.DOUBLE, MongoDBType.jsonNodeToDorisType(new 
DoubleNode(1234.23)));
+        assertEquals(DorisType.BOOLEAN, 
MongoDBType.jsonNodeToDorisType(BooleanNode.TRUE));
+        assertEquals(
+                DorisType.ARRAY,
+                
MongoDBType.jsonNodeToDorisType(JsonNodeFactory.instance.arrayNode()));
+        assertEquals(
+                "DECIMALV3(6,2)",
+                MongoDBType.jsonNodeToDorisType(new DecimalNode(new 
BigDecimal("1234.23"))));
+
+        ObjectNode dateJsonNodes = JsonNodeFactory.instance.objectNode();
+        dateJsonNodes.put(MongoDBType.DATE_TYPE, "");
+        assertEquals(DorisType.DATETIME_V2 + "(3)", 
MongoDBType.jsonNodeToDorisType(dateJsonNodes));
+
+        ObjectNode decimalJsonNodes = JsonNodeFactory.instance.objectNode();
+        decimalJsonNodes.put(MongoDBType.DECIMAL_TYPE, "1234.23");
+        assertEquals("DECIMALV3(6,2)", 
MongoDBType.jsonNodeToDorisType(decimalJsonNodes));
+
+        ObjectNode longJsonNodes = JsonNodeFactory.instance.objectNode();
+        longJsonNodes.put(MongoDBType.LONG_TYPE, "1234234466");
+        assertEquals(DorisType.BIGINT, 
MongoDBType.jsonNodeToDorisType(longJsonNodes));
+    }
+
+    @Test
+    public void getDecimalPrecisionAndScale() {
+        String decimalString1 = "DECIMAL(13,6)";
+        String decimalString2 = "DECIMAL(20,10)";
+        String decimalString3 = "DECIMAL(5,10)";
+        String decimalString4 = "DECIMAL(10,5)";
+
+        Tuple2<Integer, Integer> decimalPrecision1 =
+                MongoDBType.getDecimalPrecisionAndScale(decimalString1);
+        assertArrayEquals(
+                new int[] {13, 6}, new int[] {decimalPrecision1.f0, 
decimalPrecision1.f1});
+
+        Tuple2<Integer, Integer> decimalPrecision2 =
+                MongoDBType.getDecimalPrecisionAndScale(decimalString2);
+        assertArrayEquals(
+                new int[] {20, 10}, new int[] {decimalPrecision2.f0, 
decimalPrecision2.f1});
+
+        Tuple2<Integer, Integer> decimalPrecision3 =
+                MongoDBType.getDecimalPrecisionAndScale(decimalString3);
+        assertArrayEquals(
+                new int[] {5, 10}, new int[] {decimalPrecision3.f0, 
decimalPrecision3.f1});
+
+        Tuple2<Integer, Integer> decimalPrecision4 =
+                MongoDBType.getDecimalPrecisionAndScale(decimalString4);
+        assertArrayEquals(
+                new int[] {10, 5}, new int[] {decimalPrecision4.f0, 
decimalPrecision4.f1});
+    }
+
+    @Test
+    public void checkAndRebuildBigDecimal() {
+
+        HashMap<String, String> decimalTestMap =
+                new HashMap<String, String>() {
+                    {
+                        put("123456789.55555", "DECIMALV3(14,5)");
+                        put("123456789.666666", "DECIMALV3(15,6)");
+                        put("123456789.7777777", "DECIMALV3(16,7)");
+                        put("123456789.88888888", "DECIMALV3(17,8)");
+                        put("123456789.999999999", "DECIMALV3(18,9)");
+                        put("123456789.1", "DECIMALV3(10,1)");
+                        put("123456789.22", "DECIMALV3(11,2)");
+                        put("12345E4", "DECIMALV3(9,0)");
+                        put("0.12345E4", "DECIMALV3(5,1)");
+                    }
+                };
+
+        decimalTestMap.forEach(
+                (k, v) ->
+                        assertEquals(MongoDBType.checkAndRebuildBigDecimal(new 
BigDecimal(k)), v));
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverterTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverterTest.java
new file mode 100644
index 00000000..85608cc1
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverterTest.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoDateConverterTest {
+
+    @Test
+    public void convertTimestampToString() {
+        Long timestamp = 1717488217456L;
+        String formatStr = 
MongoDateConverter.convertTimestampToString(timestamp);
+        assertEquals("2024-06-04 16:03:37.456000", formatStr);
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
new file mode 100644
index 00000000..e0c09b0f
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoParsingProcessFunctionTest {
+
+    @Test
+    public void getRecordTableName() throws Exception {
+        String record =
+                "{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\": 
\\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\":
 {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\": 
\\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\":
 {\\\"$oid\\\": 
\\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime 
[...]
+        MongoParsingProcessFunction mongoParsingProcessFunction =
+                new MongoParsingProcessFunction(null);
+        String recordTableName = 
mongoParsingProcessFunction.getRecordTableName(record);
+        assertEquals("cdc_test", recordTableName);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to