This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e76775a6 [fix](cdc) Fix possible loss of precision in MongoCDC Decimal
sampling. (#463)
e76775a6 is described below
commit e76775a682a802eb80a795e31d82a65cf9cc7161
Author: bingquanzhao <[email protected]>
AuthorDate: Thu Aug 8 17:27:04 2024 +0800
[fix](cdc) Fix possible loss of precision in MongoCDC Decimal sampling.
(#463)
---
.../flink/tools/cdc/mongodb/MongoDBSchema.java | 89 ++++++++++------
.../doris/flink/tools/cdc/mongodb/MongoDBType.java | 8 +-
.../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 115 +++++++++++++++++++--
3 files changed, 173 insertions(+), 39 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 08ec4509..984419bc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -22,18 +22,31 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
public class MongoDBSchema extends SourceSchema {
private static final Logger LOG =
LoggerFactory.getLogger(MongoDBSchema.class);
+ private static final List<String> CONVERT_TYPE =
+ Arrays.asList(DorisType.BIGINT, DorisType.INT, DorisType.SMALLINT,
DorisType.TINYINT);
+
+ public enum DecimalJudgement {
+ NOT_DECIMAL,
+ CERTAIN_DECIMAL,
+ CONVERT_TO_DECIMAL;
+
+ public static boolean needProcessing(DecimalJudgement
decimalJudgement) {
+ return !decimalJudgement.equals(NOT_DECIMAL);
+ }
+ }
public MongoDBSchema(
ArrayList<Document> sampleData,
@@ -51,21 +64,50 @@ public class MongoDBSchema extends SourceSchema {
primaryKeys.add("_id");
}
- private void processSampleData(Document sampleData) {
+ @VisibleForTesting
+ protected void processSampleData(Document sampleData) {
for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
- String dorisType = MongoDBType.toDorisType(value);
- if (isDecimalField(fieldName)) {
- dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
- }
+ String dorisType = determineDorisType(fieldName, value);
fields.put(fieldName, new FieldSchema(fieldName, dorisType, null));
}
}
- private boolean isDecimalField(String fieldName) {
+ private String determineDorisType(String fieldName, Object value) {
+ String dorisType = MongoDBType.toDorisType(value);
+ // Check if the type is string or if the existing field is a string
type
+ FieldSchema existingField = fields.get(fieldName);
+ if (dorisType.equals(DorisType.STRING)
+ || (existingField != null
+ &&
existingField.getTypeString().equals(DorisType.STRING))) {
+ return DorisType.STRING;
+ }
+ // Check and process for decimal types
+ DecimalJudgement decimalJudgement = judgeDecimalField(fieldName,
dorisType);
+ if (DecimalJudgement.needProcessing(decimalJudgement)) {
+ if (decimalJudgement == DecimalJudgement.CONVERT_TO_DECIMAL) {
+ int precision = value.toString().length();
+ dorisType = MongoDBType.formatDecimalType(precision, 0);
+ }
+ dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
+ }
+ return dorisType;
+ }
+
+ private DecimalJudgement judgeDecimalField(String fieldName, String
dorisType) {
FieldSchema existingField = fields.get(fieldName);
- return existingField != null &&
existingField.getTypeString().startsWith(DorisType.DECIMAL);
+ if (existingField == null) {
+ return DecimalJudgement.NOT_DECIMAL;
+ }
+ boolean existDecimal =
existingField.getTypeString().startsWith(DorisType.DECIMAL);
+ boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL);
+ if (existDecimal && isDecimal) {
+ return DecimalJudgement.CERTAIN_DECIMAL;
+ } else if (CONVERT_TYPE.contains(dorisType)) {
+ return DecimalJudgement.CONVERT_TO_DECIMAL;
+ }
+ return DecimalJudgement.NOT_DECIMAL;
}
@VisibleForTesting
@@ -76,28 +118,17 @@ public class MongoDBSchema extends SourceSchema {
MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString());
int existingPrecision = existingPrecisionAndScale.f0;
int existingScale = existingPrecisionAndScale.f1;
+ Tuple2<Integer, Integer> currentPrecisionAndScale =
+ MongoDBType.getDecimalPrecisionAndScale(newDorisType);
+ int currentPrecision = currentPrecisionAndScale.f0;
+ int currentScale = currentPrecisionAndScale.f1;
- try {
- Tuple2<Integer, Integer> currentPrecisionAndScale =
- MongoDBType.getDecimalPrecisionAndScale(newDorisType);
- int currentPrecision = currentPrecisionAndScale.f0;
- int currentScale = currentPrecisionAndScale.f1;
-
- int newScale = Math.max(existingScale, currentScale);
- int newIntegerPartSize =
- Math.max(
- existingPrecision - existingScale,
currentPrecision - currentScale);
- int newPrecision = newIntegerPartSize + newScale;
-
- return DorisType.DECIMAL + "(" + newPrecision + "," + newScale
+ ")";
- } catch (DorisRuntimeException e) {
- LOG.warn(
- "Replace decimal type of field:{} failed, the newly
type is:{}, rollback to existing type:{}",
- fieldName,
- newDorisType,
- existingField.getTypeString());
- return existingField.getTypeString();
- }
+ int newScale = Math.max(existingScale, currentScale);
+ int newIntegerPartSize =
+ Math.max(existingPrecision - existingScale,
currentPrecision - currentScale);
+ int newPrecision = newIntegerPartSize + newScale;
+
+ return DorisType.DECIMAL + "(" + newPrecision + "," + newScale +
")";
}
return newDorisType;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
index bee85ced..578a407c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
@@ -118,9 +118,11 @@ public class MongoDBType {
decimal = new BigDecimal(decimal.toPlainString());
}
return decimal.precision() <= 38
- ? String.format(
- "%s(%s,%s)",
- DorisType.DECIMAL_V3, decimal.precision(),
Math.max(decimal.scale(), 0))
+ ? formatDecimalType(decimal.precision(),
Math.max(decimal.scale(), 0))
: DorisType.STRING;
}
+
+ public static String formatDecimalType(int precision, int scale) {
+ return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision,
scale);
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index 75dadde8..a9cc8bbc 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -17,10 +17,14 @@
package org.apache.doris.flink.tools.cdc.mongodb;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.bson.Document;
+import org.bson.types.Decimal128;
import org.junit.Test;
+import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -37,22 +41,119 @@ public class MongoDBSchemaTest {
}
@Test
- public void replaceDecimalTypeIfNeeded() throws Exception {
+ public void replaceDecimalTypeIfNeededTest1() throws Exception {
ArrayList<Document> documents = new ArrayList<>();
documents.add(new Document("fields1", 1234567.666666));
+ documents.add(new Document("fields1", 123456789.88888888));
+
MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
- String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1",
"DECIMALV3(12,8)");
- assertEquals("DECIMAL(15,8)", d);
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ FieldSchema fieldSchema = entry.getValue();
+ String fieldName = entry.getKey();
+ if (fieldName.equals("fields1")) {
+ assertEquals("DECIMAL(17,8)", fieldSchema.getTypeString());
+ }
+ }
}
@Test
- public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws
Exception {
+ public void replaceDecimalTypeIfNeededTest2() throws Exception {
ArrayList<Document> documents = new ArrayList<>();
documents.add(new Document("fields1", 1234567.666666));
- documents.add(new Document("fields1", 1234567));
+ documents.add(new Document("fields1", 123456789));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ FieldSchema fieldSchema = entry.getValue();
+ String fieldName = entry.getKey();
+ if (fieldName.equals("fields1")) {
+ assertEquals("DECIMAL(15,6)", fieldSchema.getTypeString());
+ }
+ }
+ }
+
+ @Test
+ public void replaceDecimalTypeIfNeededTest3() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", 1234567.666666));
+ documents.add(new Document("fields1", 123456789));
documents.add(new Document("fields1", 1234567.7777777));
+ documents.add(
+ new Document("fields1", new Decimal128(new
BigDecimal("12345679012.999999999"))));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ FieldSchema fieldSchema = entry.getValue();
+ String fieldName = entry.getKey();
+ if (fieldName.equals("fields1")) {
+ assertEquals("DECIMAL(20,9)", fieldSchema.getTypeString());
+ }
+ }
+ }
+
+ @Test
+ public void replaceDecimalTypeIfNeededTest4() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", "yes"));
+ documents.add(new Document("fields1", 1234567.666666));
+ documents.add(new Document("fields1", 123456789));
+ documents.add(new Document("fields1", 1234567.7777777));
+ documents.add(
+ new Document("fields1", new Decimal128(new
BigDecimal("12345679012.999999999"))));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ FieldSchema fieldSchema = entry.getValue();
+ String fieldName = entry.getKey();
+ if (fieldName.equals("fields1")) {
+ assertEquals("STRING", fieldSchema.getTypeString());
+ }
+ }
+ }
+
+ @Test
+ public void replaceDecimalTypeIfNeededTest5() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", 1234567.666666));
+ documents.add(new Document("fields1", 123456789));
+ documents.add(new Document("fields1", 1234567.7777777));
+ documents.add(new Document("fields1", "yes"));
+ documents.add(
+ new Document("fields1", new Decimal128(new
BigDecimal("12345679012.999999999"))));
+
+ MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ FieldSchema fieldSchema = entry.getValue();
+ String fieldName = entry.getKey();
+ if (fieldName.equals("fields1")) {
+ assertEquals("STRING", fieldSchema.getTypeString());
+ }
+ }
+ }
+
+ @Test
+ public void replaceDecimalTypeIfNeededTest6() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("fields1", 1234567.666666));
+ documents.add(new Document("fields1", 123456789));
+ documents.add(new Document("fields1", 1234567.7777777));
+ documents.add(new Document("fields1", 123444555433445L));
+ documents.add(
+ new Document("fields1", new Decimal128(new
BigDecimal("12345679012.999999999"))));
+
MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST",
"test_table", "");
- String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1",
"DECIMALV3(12,8)");
- assertEquals("DECIMAL(15,8)", d);
+ Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ FieldSchema fieldSchema = entry.getValue();
+ String fieldName = entry.getKey();
+ if (fieldName.equals("fields1")) {
+ assertEquals("DECIMAL(24,9)", fieldSchema.getTypeString());
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]