This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7f09d028 [Fix] Fix the issue with parsing MongoDB timestamp and array
types (#547)
7f09d028 is described below
commit 7f09d02831e3c0f595ddb7742aa481e8060a06b5
Author: Qinghuang Xu <[email protected]>
AuthorDate: Mon Feb 10 10:47:59 2025 +0800
[Fix] Fix the issue with parsing MongoDB timestamp and array types (#547)
---
.../tools/cdc/mongodb/ChangeStreamConstant.java | 1 +
.../doris/flink/tools/cdc/mongodb/MongoDBType.java | 32 ++++++++++++---------
.../serializer/MongoJsonDebeziumSchemaChange.java | 10 +++++--
.../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 33 ++++++++++++++++++++++
.../flink/tools/cdc/mongodb/MongoDBTypeTest.java | 22 +++++++++++----
5 files changed, 78 insertions(+), 20 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
index f8772c9f..4e0d8e53 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
@@ -35,6 +35,7 @@ public class ChangeStreamConstant implements Serializable {
public static final String FIELD_DOCUMENT_KEY = "documentKey";
public static final String DATE_FIELD = "$date";
+ public static final String TIMESTAMP_FIELD = "$timestamp";
public static final String DECIMAL_FIELD = "$numberDecimal";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
index 578a407c..cb4896ab 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
@@ -30,26 +30,29 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.bson.BsonArray;
+import org.bson.BsonTimestamp;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import java.math.BigDecimal;
import java.util.Date;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class MongoDBType {
-
- public static final String DATE_TYPE = "$date";
- public static final String DECIMAL_TYPE = "$numberDecimal";
- public static final String LONG_TYPE = "$numberLong";
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD;
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD;
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD;
+public class MongoDBType {
public static String toDorisType(Object value) {
if (value instanceof Integer) {
return DorisType.INT;
} else if (value instanceof Date) {
return DorisType.DATETIME_V2 + "(3)";
+ } else if (value instanceof BsonTimestamp) {
+ return DorisType.DATETIME_V2 + "(0)";
} else if (value instanceof Long) {
return DorisType.BIGINT;
} else if (value instanceof Double) {
@@ -60,8 +63,8 @@ public class MongoDBType {
return DorisType.STRING;
} else if (value instanceof ObjectId) {
return DorisType.VARCHAR + "(30)";
- } else if (value instanceof BsonArray) {
- return DorisType.ARRAY;
+ } else if (value instanceof List) {
+ return DorisType.ARRAY + "<" + DorisType.STRING + ">";
} else if (value instanceof Decimal128) {
return checkAndRebuildBigDecimal(((Decimal128)
value).bigDecimalValue());
} else {
@@ -77,19 +80,22 @@ public class MongoDBType {
} else if (value instanceof LongNode) {
return DorisType.BIGINT;
} else if (value instanceof DoubleNode) {
+ // When mongo double is in the JsonNode, it's actually a decimal
type
return DorisType.DOUBLE;
} else if (value instanceof BooleanNode) {
return DorisType.BOOLEAN;
} else if (value instanceof ArrayNode) {
- return DorisType.ARRAY;
+ return DorisType.ARRAY + "<" + DorisType.STRING + ">";
} else if (value instanceof DecimalNode) {
return checkAndRebuildBigDecimal(value.decimalValue());
} else if (value instanceof ObjectNode) {
- if (value.size() == 1 && value.get(DATE_TYPE) != null) {
+ if (value.size() == 1 && value.get(DATE_FIELD) != null) {
return DorisType.DATETIME_V2 + "(3)";
- } else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) {
- return checkAndRebuildBigDecimal(new
BigDecimal(value.get(DECIMAL_TYPE).asText()));
- } else if (value.size() == 1 && value.get(LONG_TYPE) != null) {
+ } else if (value.size() == 1 && value.get(TIMESTAMP_FIELD) !=
null) {
+ return DorisType.DATETIME_V2 + "(0)";
+ } else if (value.size() == 1 && value.get(DECIMAL_FIELD) != null) {
+ return checkAndRebuildBigDecimal(new
BigDecimal(value.get(DECIMAL_FIELD).asText()));
+ } else if (value.size() == 1 && value.get(LONG_FIELD) != null) {
return DorisType.BIGINT;
} else {
return DorisType.STRING;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
index c3a4a7d8..c67e856e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
@@ -50,6 +50,7 @@ import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIEL
import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD;
public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {
@@ -67,7 +68,7 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
private final DorisOptions dorisOptions;
private final Set<String> specialFields =
- new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD,
LONG_FIELD));
+ new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD,
DECIMAL_FIELD, LONG_FIELD));
public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext
changeContext) {
this.objectMapper = changeContext.getObjectMapper();
@@ -127,7 +128,12 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
if (specialFields.contains(fieldKey)) {
switch (fieldKey) {
case DATE_FIELD:
- long timestamp =
fieldNode.get(DATE_FIELD).asLong();
+ case TIMESTAMP_FIELD:
+ JsonNode jsonNode =
fieldNode.get(fieldKey);
+ long timestamp =
+
fieldKey.equals(TIMESTAMP_FIELD)
+ ?
jsonNode.get("t").asLong() * 1000L
+ :
jsonNode.asLong();
String formattedDate =
MongoDateConverter.convertTimestampToString(
timestamp);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index a9cc8bbc..2f095608 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -18,12 +18,16 @@
package org.apache.doris.flink.tools.cdc.mongodb;
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
import org.junit.Test;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -40,6 +44,35 @@ public class MongoDBSchemaTest {
assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName());
}
+ @Test
+ public void testMongoSampleDataFields() throws Exception {
+ ArrayList<Document> sampleData = new ArrayList<>();
+ sampleData.add(new Document("_id", new
ObjectId("678643e649a4c9239b04297b")));
+ sampleData.add(new Document("c_string", "Hello, MongoDB!"));
+ sampleData.add(new Document("c_bool", true));
+ sampleData.add(new Document("c_int", 123456));
+ sampleData.add(new Document("c_long", 1234567890123456789L));
+ sampleData.add(new Document("c_double", 123.45));
+ sampleData.add(new Document("c_decimal", new
Decimal128(BigDecimal.valueOf(12345.67))));
+ sampleData.add(new Document("c_date", new Date(1234567890)));
+ sampleData.add(new Document("c_timestamp", new
BsonTimestamp(1334567890)));
+ Map<String, String> map = new LinkedHashMap<>();
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+ sampleData.add(new Document("c_object", new Document(map)));
+ ArrayList<Object> array = new ArrayList<>();
+ array.add("str1");
+ array.add("str2");
+ array.add(789);
+ sampleData.add(new Document("c_array", array));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(sampleData, "db_TEST",
"test_table", "");
+
+ assertEquals(
+ "{_id=FieldSchema{name='_id', typeString='VARCHAR(30)',
defaultValue='null', comment='null'}, c_string=FieldSchema{name='c_string',
typeString='STRING', defaultValue='null', comment='null'},
c_bool=FieldSchema{name='c_bool', typeString='BOOLEAN', defaultValue='null',
comment='null'}, c_int=FieldSchema{name='c_int', typeString='INT',
defaultValue='null', comment='null'}, c_long=FieldSchema{name='c_long',
typeString='BIGINT', defaultValue='null', comment='null'}, c_double=F [...]
+ mongoDBSchema.getFields().toString());
+ }
+
@Test
public void replaceDecimalTypeIfNeededTest1() throws Exception {
ArrayList<Document> documents = new ArrayList<>();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
index ee511ce2..4e273ab8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
@@ -29,6 +29,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.bson.BsonArray;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonTimestamp;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.junit.Test;
@@ -46,6 +49,7 @@ public class MongoDBTypeTest {
public void toDorisType() {
assertEquals(DorisType.INT, MongoDBType.toDorisType(new Integer(123)));
assertEquals(DorisType.DATETIME_V2 + "(3)",
MongoDBType.toDorisType(new Date()));
+ assertEquals(DorisType.DATETIME_V2 + "(0)",
MongoDBType.toDorisType(new BsonTimestamp()));
assertEquals(DorisType.BIGINT, MongoDBType.toDorisType(new
Long(1234567891)));
assertEquals("DECIMALV3(6,2)", MongoDBType.toDorisType(new
Double("1234.56")));
assertEquals(DorisType.BOOLEAN, MongoDBType.toDorisType(new
Boolean(true)));
@@ -53,10 +57,13 @@ public class MongoDBTypeTest {
assertEquals(
DorisType.VARCHAR + "(30)",
MongoDBType.toDorisType(new
ObjectId("66583533791a67a6f8c5a339")));
- assertEquals(DorisType.ARRAY, MongoDBType.toDorisType(new
BsonArray()));
assertEquals(
"DECIMALV3(10,5)",
MongoDBType.toDorisType(new Decimal128(new
BigDecimal("12345.55555"))));
+ BsonArray bsonArray = new BsonArray();
+ bsonArray.add(new BsonString("string"));
+ bsonArray.add(new BsonInt64(123456789));
+ assertEquals(DorisType.ARRAY + "<STRING>",
MongoDBType.toDorisType(bsonArray));
}
@Test
@@ -67,22 +74,27 @@ public class MongoDBTypeTest {
assertEquals(DorisType.DOUBLE, MongoDBType.jsonNodeToDorisType(new
DoubleNode(1234.23)));
assertEquals(DorisType.BOOLEAN,
MongoDBType.jsonNodeToDorisType(BooleanNode.TRUE));
assertEquals(
- DorisType.ARRAY,
+ DorisType.ARRAY + "<STRING>",
MongoDBType.jsonNodeToDorisType(JsonNodeFactory.instance.arrayNode()));
assertEquals(
"DECIMALV3(6,2)",
MongoDBType.jsonNodeToDorisType(new DecimalNode(new
BigDecimal("1234.23"))));
ObjectNode dateJsonNodes = JsonNodeFactory.instance.objectNode();
- dateJsonNodes.put(MongoDBType.DATE_TYPE, "");
+ dateJsonNodes.put(ChangeStreamConstant.DATE_FIELD, "");
assertEquals(DorisType.DATETIME_V2 + "(3)",
MongoDBType.jsonNodeToDorisType(dateJsonNodes));
+ ObjectNode timestampJsonNodes = JsonNodeFactory.instance.objectNode();
+ timestampJsonNodes.put(ChangeStreamConstant.TIMESTAMP_FIELD, "");
+ assertEquals(
+ DorisType.DATETIME_V2 + "(0)",
MongoDBType.jsonNodeToDorisType(timestampJsonNodes));
+
ObjectNode decimalJsonNodes = JsonNodeFactory.instance.objectNode();
- decimalJsonNodes.put(MongoDBType.DECIMAL_TYPE, "1234.23");
+ decimalJsonNodes.put(ChangeStreamConstant.DECIMAL_FIELD, "1234.23");
assertEquals("DECIMALV3(6,2)",
MongoDBType.jsonNodeToDorisType(decimalJsonNodes));
ObjectNode longJsonNodes = JsonNodeFactory.instance.objectNode();
- longJsonNodes.put(MongoDBType.LONG_TYPE, "1234234466");
+ longJsonNodes.put(ChangeStreamConstant.LONG_FIELD, "1234234466");
assertEquals(DorisType.BIGINT,
MongoDBType.jsonNodeToDorisType(longJsonNodes));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]