This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 73636562 [fix][mongodb-cdc]fix replace decimal type error when meeting
non decimal type (#448)
73636562 is described below
commit 73636562f1ce21211b2e6c3e678d4481d0eff4de
Author: North Lin <[email protected]>
AuthorDate: Fri Jul 26 16:12:40 2024 +0800
[fix][mongodb-cdc]fix replace decimal type error when meeting non decimal
type (#448)
---
.../flink/tools/cdc/mongodb/MongoDBSchema.java | 32 ++++++++++++++++------
.../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 11 ++++++++
2 files changed, 34 insertions(+), 9 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 41752c5e..08ec4509 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -22,14 +22,18 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
public class MongoDBSchema extends SourceSchema {
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoDBSchema.class);
public MongoDBSchema(
ArrayList<Document> sampleData,
@@ -73,17 +77,27 @@ public class MongoDBSchema extends SourceSchema {
int existingPrecision = existingPrecisionAndScale.f0;
int existingScale = existingPrecisionAndScale.f1;
- Tuple2<Integer, Integer> currentPrecisionAndScale =
- MongoDBType.getDecimalPrecisionAndScale(newDorisType);
- int currentPrecision = currentPrecisionAndScale.f0;
- int currentScale = currentPrecisionAndScale.f1;
+ try {
+ Tuple2<Integer, Integer> currentPrecisionAndScale =
+ MongoDBType.getDecimalPrecisionAndScale(newDorisType);
+ int currentPrecision = currentPrecisionAndScale.f0;
+ int currentScale = currentPrecisionAndScale.f1;
- int newScale = Math.max(existingScale, currentScale);
- int newIntegerPartSize =
- Math.max(existingPrecision - existingScale,
currentPrecision - currentScale);
- int newPrecision = newIntegerPartSize + newScale;
+ int newScale = Math.max(existingScale, currentScale);
+ int newIntegerPartSize =
+ Math.max(
+ existingPrecision - existingScale,
currentPrecision - currentScale);
+ int newPrecision = newIntegerPartSize + newScale;
- return DorisType.DECIMAL + "(" + newPrecision + "," + newScale +
")";
+ return DorisType.DECIMAL + "(" + newPrecision + "," + newScale
+ ")";
+ } catch (DorisRuntimeException e) {
+ LOG.warn(
+ "Replace decimal type of field:{} failed, the newly
type is:{}, rollback to existing type:{}",
+ fieldName,
+ newDorisType,
+ existingField.getTypeString());
+ return existingField.getTypeString();
+ }
}
return newDorisType;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index 57f7f470..75dadde8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -44,4 +44,15 @@ public class MongoDBSchemaTest {
String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1",
"DECIMALV3(12,8)");
assertEquals("DECIMAL(15,8)", d);
}
+
+ @Test
+ public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws
Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", 1234567.666666));
+ documents.add(new Document("fields1", 1234567));
+ documents.add(new Document("fields1", 1234567.7777777));
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1",
"DECIMALV3(12,8)");
+ assertEquals("DECIMAL(15,8)", d);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]