This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8f0052ce [Improve](cdc)(MongoDB) optimize type inference to avoid
unnecessary decimal conversion (#596)
8f0052ce is described below
commit 8f0052cee805417bfe1e575605e4e2b074b219b5
Author: kwonder0926 <[email protected]>
AuthorDate: Mon May 12 15:00:49 2025 +0800
[Improve](cdc)(MongoDB) optimize type inference to avoid unnecessary
decimal conversion (#596)
---
.../flink/tools/cdc/mongodb/MongoDBSchema.java | 30 ++++++++++++++-
.../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 45 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 2e2788de..b0a03105 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -106,6 +106,13 @@ public class MongoDBSchema extends SourceSchema {
&&
existingField.getTypeString().equals(DorisType.STRING))) {
return DorisType.STRING;
}
+ if (existingField != null) {
+ String existingType = existingField.getTypeString();
+ if (isDowngrade(existingType, dorisType)) {
+ return existingType;
+ }
+ }
+
// Check and process for decimal types
DecimalJudgement decimalJudgement = judgeDecimalField(fieldName,
dorisType);
if (DecimalJudgement.needProcessing(decimalJudgement)) {
@@ -118,6 +125,27 @@ public class MongoDBSchema extends SourceSchema {
return dorisType;
}
+ /**
+ * Check whether the newType is a downgrade from the existingType.
Currently only considers
+ * numeric types: TINYINT < SMALLINT < INT < BIGINT.
+ */
+ private boolean isDowngrade(String existingType, String newType) {
+ List<String> typeHierarchy =
+ Arrays.asList(
+ DorisType.TINYINT, DorisType.SMALLINT, DorisType.INT,
DorisType.BIGINT);
+
+ int existingIndex = typeHierarchy.indexOf(existingType);
+ int newIndex = typeHierarchy.indexOf(newType);
+
+ return newIndex != -1 && existingIndex != -1 && newIndex <
existingIndex;
+ }
+
+ /**
+ * Determine whether the field should be treated as a decimal type: - If
the existing type is
+ * already decimal and current value is also decimal, return
CERTAIN_DECIMAL. - If the field's
+ * type is convertible (e.g., INT, BIGINT) and the current value is
decimal or double, return
+ * CONVERT_TO_DECIMAL. - Otherwise, no decimal processing is needed.
+ */
private DecimalJudgement judgeDecimalField(String fieldName, String
dorisType) {
FieldSchema existingField = fields.get(fieldName);
if (existingField == null) {
@@ -127,7 +155,7 @@ public class MongoDBSchema extends SourceSchema {
boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL);
if (existDecimal && isDecimal) {
return DecimalJudgement.CERTAIN_DECIMAL;
- } else if (CONVERT_TYPE.contains(dorisType)) {
+ } else if (existDecimal && CONVERT_TYPE.contains(dorisType)) {
return DecimalJudgement.CONVERT_TO_DECIMAL;
}
return DecimalJudgement.NOT_DECIMAL;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index 9a4a9eb7..795cd56f 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -189,4 +189,49 @@ public class MongoDBSchemaTest {
}
}
}
+
+ @Test
+ public void testIntFieldNotConvertedToDecimal() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", Integer.MAX_VALUE));
+ documents.add(new Document("fields1", 1234567));
+ documents.add(new Document("fields1", Integer.MIN_VALUE));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ FieldSchema fieldSchema = mongoDBSchema.getFields().get("fields1");
+
+ assertEquals("INT", fieldSchema.getTypeString());
+ }
+
+ @Test
+ public void testBigIntFieldNotConvertedToDecimal() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", Long.MAX_VALUE));
+ documents.add(new Document("fields1", 12233720368541346L));
+ documents.add(new Document("fields1", Long.MIN_VALUE));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ String fieldName = entry.getKey();
+ FieldSchema fieldSchema = entry.getValue();
+ if (fieldName.equals("fields1")) {
+ assertEquals("BIGINT", fieldSchema.getTypeString());
+ }
+ }
+ }
+
+ @Test
+ public void testMixedIntAndBigIntFieldsShouldConvertToBigInt() throws
Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", 2147483647));
+ documents.add(new Document("fields1", 9223372036854775807L));
+ documents.add(new Document("fields1", 12234));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ FieldSchema fieldSchema = mongoDBSchema.getFields().get("fields1");
+
+ assertEquals("BIGINT", fieldSchema.getTypeString());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]