This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 067eaf12 [improve](cdc) Optimize MongoCDC sampleSize calculation logic
(#542)
067eaf12 is described below
commit 067eaf12fda5a31db0f6edd1ea1a33225ccbb870
Author: Qinghuang Xu <[email protected]>
AuthorDate: Mon Jan 13 10:04:06 2025 +0800
[improve](cdc) Optimize MongoCDC sampleSize calculation logic (#542)
---
.../tools/cdc/mongodb/MongoDBDatabaseSync.java | 32 ++++++++++++++-
.../tools/cdc/mongodb/MongoDBDatabaseSyncTest.java | 48 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 3526c075..ca034ac3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -68,6 +68,18 @@ public class MongoDBDatabaseSync extends DatabaseSync {
.defaultValue(0.2)
.withDescription("mongo cdc sample percent");
+ public static final ConfigOption<Long> MONGO_CDC_MIN_SAMPLE_SIZE =
+ ConfigOptions.key("schema.min-sample-size")
+ .longType()
+ .defaultValue(1000L)
+ .withDescription("mongo cdc min sample size");
+
+ public static final ConfigOption<Long> MONGO_CDC_MAX_SAMPLE_SIZE =
+ ConfigOptions.key("schema.max-sample-size")
+ .longType()
+ .defaultValue(100000L)
+ .withDescription("mongo cdc max sample size");
+
public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
@@ -101,6 +113,8 @@ public class MongoDBDatabaseSync extends DatabaseSync {
MongoClientSettings settings = settingsBuilder.build();
Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
+ Long minSampleSize = config.get(MONGO_CDC_MIN_SAMPLE_SIZE);
+ Long maxSampleSize = config.get(MONGO_CDC_MAX_SAMPLE_SIZE);
try (MongoClient mongoClient = MongoClients.create(settings)) {
MongoDatabase mongoDatabase =
mongoClient.getDatabase(databaseName);
MongoIterable<String> collectionNames =
mongoDatabase.listCollectionNames();
@@ -115,7 +129,10 @@ public class MongoDBDatabaseSync extends DatabaseSync {
}
long totalDocuments = collection.estimatedDocumentCount();
- long sampleSize = (long) Math.ceil(totalDocuments *
samplePercent);
+ long sampleSize =
+ calculateSampleSize(
+ totalDocuments, samplePercent, minSampleSize,
maxSampleSize);
+
ArrayList<Document> documents = sampleData(collection,
sampleSize);
MongoDBSchema mongoDBSchema =
new MongoDBSchema(documents, databaseName,
collectionName, null);
@@ -127,6 +144,19 @@ public class MongoDBDatabaseSync extends DatabaseSync {
return schemaList;
}
+ public long calculateSampleSize(
+ long totalDocuments, Double samplePercent, Long minSampleSize,
Long maxSampleSize) {
+ if (totalDocuments < minSampleSize) {
+ return totalDocuments;
+ }
+ long sampleSize = (long) Math.ceil(totalDocuments * samplePercent);
+ // If the number of samples is less than the minimum threshold, the
minimum threshold is
+ // used, while ensuring that the number of samples does not exceed the
maximum threshold
+ sampleSize = Math.max(sampleSize, minSampleSize);
+ sampleSize = Math.min(sampleSize, maxSampleSize);
+ return sampleSize;
+ }
+
private ArrayList<Document> sampleData(MongoCollection<Document>
collection, Long sampleNum) {
ArrayList<Document> query = new ArrayList<>();
query.add(new Document("$sample", new Document("size", sampleNum)));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java
new file mode 100644
index 00000000..94c6c51c
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoDBDatabaseSyncTest {
+ private MongoDBDatabaseSync mongoDBDatabaseSync;
+
+ @Before
+ public void init() throws SQLException {
+ mongoDBDatabaseSync = new MongoDBDatabaseSync();
+ }
+
+ @Test
+ public void testCalculateSampleSize() {
+ long sampleSize1 = mongoDBDatabaseSync.calculateSampleSize(100L, 0.2,
1000L, 100000L);
+ long sampleSize2 = mongoDBDatabaseSync.calculateSampleSize(1000L, 0.2,
1000L, 100000L);
+ long sampleSize3 = mongoDBDatabaseSync.calculateSampleSize(2000L, 0.2,
1000L, 100000L);
+ long sampleSize4 = mongoDBDatabaseSync.calculateSampleSize(10000L,
0.2, 1000L, 100000L);
+ long sampleSize5 = mongoDBDatabaseSync.calculateSampleSize(1000000L,
0.2, 1000L, 100000L);
+ assertEquals(100, sampleSize1);
+ assertEquals(1000, sampleSize2);
+ assertEquals(1000, sampleSize3);
+ assertEquals(2000, sampleSize4);
+ assertEquals(100000, sampleSize5);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]