This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f0052ce [Improve](cdc)(MongoDB) optimize type inference to avoid 
unnecessary decimal conversion (#596)
8f0052ce is described below

commit 8f0052cee805417bfe1e575605e4e2b074b219b5
Author: kwonder0926 <[email protected]>
AuthorDate: Mon May 12 15:00:49 2025 +0800

    [Improve](cdc)(MongoDB) optimize type inference to avoid unnecessary 
decimal conversion (#596)
---
 .../flink/tools/cdc/mongodb/MongoDBSchema.java     | 30 ++++++++++++++-
 .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 45 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 2e2788de..b0a03105 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -106,6 +106,13 @@ public class MongoDBSchema extends SourceSchema {
                         && 
existingField.getTypeString().equals(DorisType.STRING))) {
             return DorisType.STRING;
         }
+        if (existingField != null) {
+            String existingType = existingField.getTypeString();
+            if (isDowngrade(existingType, dorisType)) {
+                return existingType;
+            }
+        }
+
         // Check and process for decimal types
         DecimalJudgement decimalJudgement = judgeDecimalField(fieldName, 
dorisType);
         if (DecimalJudgement.needProcessing(decimalJudgement)) {
@@ -118,6 +125,27 @@ public class MongoDBSchema extends SourceSchema {
         return dorisType;
     }
 
+    /**
+     * Check whether the newType is a downgrade from the existingType. 
Currently only considers
+     * numeric types: TINYINT < SMALLINT < INT < BIGINT.
+     */
+    private boolean isDowngrade(String existingType, String newType) {
+        List<String> typeHierarchy =
+                Arrays.asList(
+                        DorisType.TINYINT, DorisType.SMALLINT, DorisType.INT, 
DorisType.BIGINT);
+
+        int existingIndex = typeHierarchy.indexOf(existingType);
+        int newIndex = typeHierarchy.indexOf(newType);
+
+        return newIndex != -1 && existingIndex != -1 && newIndex < 
existingIndex;
+    }
+
+    /**
+     * Determine whether the field should be treated as a decimal type: - If 
the existing type is
+     * already decimal and current value is also decimal, return 
CERTAIN_DECIMAL. - If the field's
+     * type is convertible (e.g., INT, BIGINT) and the current value is 
decimal or double, return
+     * CONVERT_TO_DECIMAL. - Otherwise, no decimal processing is needed.
+     */
     private DecimalJudgement judgeDecimalField(String fieldName, String 
dorisType) {
         FieldSchema existingField = fields.get(fieldName);
         if (existingField == null) {
@@ -127,7 +155,7 @@ public class MongoDBSchema extends SourceSchema {
         boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL);
         if (existDecimal && isDecimal) {
             return DecimalJudgement.CERTAIN_DECIMAL;
-        } else if (CONVERT_TYPE.contains(dorisType)) {
+        } else if (existDecimal && CONVERT_TYPE.contains(dorisType)) {
             return DecimalJudgement.CONVERT_TO_DECIMAL;
         }
         return DecimalJudgement.NOT_DECIMAL;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index 9a4a9eb7..795cd56f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -189,4 +189,49 @@ public class MongoDBSchemaTest {
             }
         }
     }
+
+    @Test
+    public void testIntFieldNotConvertedToDecimal() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", Integer.MAX_VALUE));
+        documents.add(new Document("fields1", 1234567));
+        documents.add(new Document("fields1", Integer.MIN_VALUE));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        FieldSchema fieldSchema = mongoDBSchema.getFields().get("fields1");
+
+        assertEquals("INT", fieldSchema.getTypeString());
+    }
+
+    @Test
+    public void testBigIntFieldNotConvertedToDecimal() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", Long.MAX_VALUE));
+        documents.add(new Document("fields1", 12233720368541346L));
+        documents.add(new Document("fields1", Long.MIN_VALUE));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            String fieldName = entry.getKey();
+            FieldSchema fieldSchema = entry.getValue();
+            if (fieldName.equals("fields1")) {
+                assertEquals("BIGINT", fieldSchema.getTypeString());
+            }
+        }
+    }
+
+    @Test
+    public void testMixedIntAndBigIntFieldsShouldConvertToBigInt() throws 
Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", 2147483647));
+        documents.add(new Document("fields1", 9223372036854775807L));
+        documents.add(new Document("fields1", 12234));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        FieldSchema fieldSchema = mongoDBSchema.getFields().get("fields1");
+
+        assertEquals("BIGINT", fieldSchema.getTypeString());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to