This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 03faf5b4 [improve](testcase) increase mongo cdc UT (#393)
03faf5b4 is described below
commit 03faf5b4f52aa1ffad168ef16b23a5cd803e432f
Author: bingquanzhao <[email protected]>
AuthorDate: Tue Jun 4 20:40:24 2024 +0800
[improve](testcase) increase mongo cdc UT (#393)
---
.../flink/tools/cdc/mongodb/MongoDBSchema.java | 4 +-
.../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 47 +++++++
.../flink/tools/cdc/mongodb/MongoDBTypeTest.java | 139 +++++++++++++++++++++
.../tools/cdc/mongodb/MongoDateConverterTest.java | 32 +++++
.../mongodb/MongoParsingProcessFunctionTest.java | 35 ++++++
5 files changed, 256 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 2c2e1b48..41752c5e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.tools.cdc.mongodb;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.doris.flink.catalog.doris.DorisType;
@@ -63,7 +64,8 @@ public class MongoDBSchema extends SourceSchema {
return existingField != null &&
existingField.getTypeString().startsWith(DorisType.DECIMAL);
}
- private String replaceDecimalTypeIfNeeded(String fieldName, String
newDorisType) {
+ @VisibleForTesting
+ protected String replaceDecimalTypeIfNeeded(String fieldName, String
newDorisType) {
FieldSchema existingField = fields.get(fieldName);
if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) {
Tuple2<Integer, Integer> existingPrecisionAndScale =
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
new file mode 100644
index 00000000..57f7f470
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.bson.Document;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoDBSchemaTest {
+
+ @Test
+ public void convertToDorisType() {}
+
+ @Test
+ public void getCdcTableName() throws Exception {
+ MongoDBSchema mongoDBSchema =
+ new MongoDBSchema(new ArrayList<Document>(), "db_TEST",
"test_table", "");
+ assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName());
+ }
+
+ @Test
+ public void replaceDecimalTypeIfNeeded() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", 1234567.666666));
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1",
"DECIMALV3(12,8)");
+ assertEquals("DECIMAL(15,8)", d);
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
new file mode 100644
index 00000000..ee511ce2
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.bson.BsonArray;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class MongoDBTypeTest {
+
+ @Test
+ public void toDorisType() {
+ assertEquals(DorisType.INT, MongoDBType.toDorisType(new Integer(123)));
+ assertEquals(DorisType.DATETIME_V2 + "(3)",
MongoDBType.toDorisType(new Date()));
+ assertEquals(DorisType.BIGINT, MongoDBType.toDorisType(new
Long(1234567891)));
+ assertEquals("DECIMALV3(6,2)", MongoDBType.toDorisType(new
Double("1234.56")));
+ assertEquals(DorisType.BOOLEAN, MongoDBType.toDorisType(new
Boolean(true)));
+ assertEquals(DorisType.STRING, MongoDBType.toDorisType("string"));
+ assertEquals(
+ DorisType.VARCHAR + "(30)",
+ MongoDBType.toDorisType(new
ObjectId("66583533791a67a6f8c5a339")));
+ assertEquals(DorisType.ARRAY, MongoDBType.toDorisType(new
BsonArray()));
+ assertEquals(
+ "DECIMALV3(10,5)",
+ MongoDBType.toDorisType(new Decimal128(new
BigDecimal("12345.55555"))));
+ }
+
+ @Test
+ public void jsonNodeToDorisType() {
+ assertEquals(DorisType.INT, MongoDBType.jsonNodeToDorisType(new
IntNode(1234)));
+ assertEquals(DorisType.STRING, MongoDBType.jsonNodeToDorisType(new
TextNode("text")));
+ assertEquals(DorisType.BIGINT, MongoDBType.jsonNodeToDorisType(new
LongNode(1234568948)));
+ assertEquals(DorisType.DOUBLE, MongoDBType.jsonNodeToDorisType(new
DoubleNode(1234.23)));
+ assertEquals(DorisType.BOOLEAN,
MongoDBType.jsonNodeToDorisType(BooleanNode.TRUE));
+ assertEquals(
+ DorisType.ARRAY,
+
MongoDBType.jsonNodeToDorisType(JsonNodeFactory.instance.arrayNode()));
+ assertEquals(
+ "DECIMALV3(6,2)",
+ MongoDBType.jsonNodeToDorisType(new DecimalNode(new
BigDecimal("1234.23"))));
+
+ ObjectNode dateJsonNodes = JsonNodeFactory.instance.objectNode();
+ dateJsonNodes.put(MongoDBType.DATE_TYPE, "");
+ assertEquals(DorisType.DATETIME_V2 + "(3)",
MongoDBType.jsonNodeToDorisType(dateJsonNodes));
+
+ ObjectNode decimalJsonNodes = JsonNodeFactory.instance.objectNode();
+ decimalJsonNodes.put(MongoDBType.DECIMAL_TYPE, "1234.23");
+ assertEquals("DECIMALV3(6,2)",
MongoDBType.jsonNodeToDorisType(decimalJsonNodes));
+
+ ObjectNode longJsonNodes = JsonNodeFactory.instance.objectNode();
+ longJsonNodes.put(MongoDBType.LONG_TYPE, "1234234466");
+ assertEquals(DorisType.BIGINT,
MongoDBType.jsonNodeToDorisType(longJsonNodes));
+ }
+
+ @Test
+ public void getDecimalPrecisionAndScale() {
+ String decimalString1 = "DECIMAL(13,6)";
+ String decimalString2 = "DECIMAL(20,10)";
+ String decimalString3 = "DECIMAL(5,10)";
+ String decimalString4 = "DECIMAL(10,5)";
+
+ Tuple2<Integer, Integer> decimalPrecision1 =
+ MongoDBType.getDecimalPrecisionAndScale(decimalString1);
+ assertArrayEquals(
+ new int[] {13, 6}, new int[] {decimalPrecision1.f0,
decimalPrecision1.f1});
+
+ Tuple2<Integer, Integer> decimalPrecision2 =
+ MongoDBType.getDecimalPrecisionAndScale(decimalString2);
+ assertArrayEquals(
+ new int[] {20, 10}, new int[] {decimalPrecision2.f0,
decimalPrecision2.f1});
+
+ Tuple2<Integer, Integer> decimalPrecision3 =
+ MongoDBType.getDecimalPrecisionAndScale(decimalString3);
+ assertArrayEquals(
+ new int[] {5, 10}, new int[] {decimalPrecision3.f0,
decimalPrecision3.f1});
+
+ Tuple2<Integer, Integer> decimalPrecision4 =
+ MongoDBType.getDecimalPrecisionAndScale(decimalString4);
+ assertArrayEquals(
+ new int[] {10, 5}, new int[] {decimalPrecision4.f0,
decimalPrecision4.f1});
+ }
+
+ @Test
+ public void checkAndRebuildBigDecimal() {
+
+ HashMap<String, String> decimalTestMap =
+ new HashMap<String, String>() {
+ {
+ put("123456789.55555", "DECIMALV3(14,5)");
+ put("123456789.666666", "DECIMALV3(15,6)");
+ put("123456789.7777777", "DECIMALV3(16,7)");
+ put("123456789.88888888", "DECIMALV3(17,8)");
+ put("123456789.999999999", "DECIMALV3(18,9)");
+ put("123456789.1", "DECIMALV3(10,1)");
+ put("123456789.22", "DECIMALV3(11,2)");
+ put("12345E4", "DECIMALV3(9,0)");
+ put("0.12345E4", "DECIMALV3(5,1)");
+ }
+ };
+
+ decimalTestMap.forEach(
+ (k, v) ->
+ assertEquals(MongoDBType.checkAndRebuildBigDecimal(new
BigDecimal(k)), v));
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverterTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverterTest.java
new file mode 100644
index 00000000..85608cc1
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverterTest.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoDateConverterTest {
+
+ @Test
+ public void convertTimestampToString() {
+ Long timestamp = 1717488217456L;
+ String formatStr =
MongoDateConverter.convertTimestampToString(timestamp);
+ assertEquals("2024-06-04 16:03:37.456000", formatStr);
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
new file mode 100644
index 00000000..e0c09b0f
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoParsingProcessFunctionTest {
+
+ @Test
+ public void getRecordTableName() throws Exception {
+ String record =
+ "{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\":
\\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\":
{\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\":
\\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\":
{\\\"$oid\\\":
\\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime
[...]
+ MongoParsingProcessFunction mongoParsingProcessFunction =
+ new MongoParsingProcessFunction(null);
+ String recordTableName =
mongoParsingProcessFunction.getRecordTableName(record);
+ assertEquals("cdc_test", recordTableName);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]