This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c5159a2760 [Improve][Connector-V2][MongoDB] Support to convert to
double from any numeric type (#6997)
c5159a2760 is described below
commit c5159a2760ecf8deb377e9b1b18367d29199aaf4
Author: Dongyeon Lee <[email protected]>
AuthorDate: Wed Jun 19 06:21:17 2024 -0700
[Improve][Connector-V2][MongoDB] Support to convert to double from any
numeric type (#6997)
---
release-note.md | 1 +
.../mongodb/serde/BsonToRowDataConverters.java | 2 +-
.../connector/v2/mongodb/AbstractMongodbIT.java | 140 ++++++++++++---------
.../e2e/connector/v2/mongodb/MongodbIT.java | 16 +++
.../src/test/resources/mongodb_double_value.conf | 88 +++++++++++++
5 files changed, 187 insertions(+), 60 deletions(-)
diff --git a/release-note.md b/release-note.md
index 840648fe64..24455c40ac 100644
--- a/release-note.md
+++ b/release-note.md
@@ -55,6 +55,7 @@
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue
#5546
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
+- [Connector-v2] [Mongodb] Support to convert to double from numeric type that
mongodb saved it as numeric internally (#6997)
### Zeta(ST-Engine)
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
index a343e0cd2d..505b30fcbd 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
@@ -353,7 +353,7 @@ public class BsonToRowDataConverters implements
Serializable {
}
private static double convertToDouble(BsonValue bsonValue) {
- if (bsonValue.isDouble()) {
+ if (bsonValue.isNumber()) {
return bsonValue.asNumber().doubleValue();
}
throw new MongodbConnectorException(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index dd24dc1e60..cf0b1dbd09 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -36,6 +36,7 @@ import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Sorts;
+import com.mongodb.client.result.InsertManyResult;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
@@ -60,6 +61,9 @@ public abstract class AbstractMongodbIT extends TestSuiteBase
implements TestRes
protected static final List<Document> TEST_NULL_DATASET =
generateTestDataSetWithNull(10);
+ protected static final List<Document> TEST_DOUBLE_DATASET =
+ generateTestDataSetWithPresets(5, Arrays.asList(44.0d, 44.1d,
44.2d, 44.3d, 44.4d));
+
protected static final String MONGODB_IMAGE = "mongo:latest";
protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
@@ -76,6 +80,10 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
protected static final String MONGODB_NULL_TABLE_RESULT =
"test_null_op_db_result";
+ protected static final String MONGODB_DOUBLE_TABLE = "test_double_op_db";
+
+ protected static final String MONGODB_DOUBLE_TABLE_RESULT =
"test_double_op_db_result";
+
protected static final String MONGODB_MATCH_RESULT_TABLE =
"test_match_op_result_db";
protected static final String MONGODB_SPLIT_RESULT_TABLE =
"test_split_op_result_db";
@@ -105,20 +113,10 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
}
protected void initSourceData() {
- MongoCollection<Document> sourceMatchTable =
-
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);
- sourceMatchTable.deleteMany(new Document());
- sourceMatchTable.insertMany(TEST_MATCH_DATASET);
-
- MongoCollection<Document> sourceSplitTable =
-
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);
- sourceSplitTable.deleteMany(new Document());
- sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
-
- MongoCollection<Document> sourceNullTable =
-
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE);
- sourceNullTable.deleteMany(new Document());
- sourceNullTable.insertMany(TEST_NULL_DATASET);
+ prepareInitDataInCollection(MONGODB_MATCH_TABLE, TEST_MATCH_DATASET);
+ prepareInitDataInCollection(MONGODB_SPLIT_TABLE, TEST_SPLIT_DATASET);
+ prepareInitDataInCollection(MONGODB_NULL_TABLE, TEST_NULL_DATASET);
+ prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET);
}
protected void clearDate(String table) {
@@ -129,51 +127,7 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
List<Document> dataSet = new ArrayList<>();
for (int i = 0; i < count; i++) {
- dataSet.add(
- new Document(
- "c_map",
- new Document("OQBqH", randomString())
- .append("rkvlO", randomString())
- .append("pCMEX", randomString())
- .append("DAgdj", randomString())
- .append("dsJag", randomString()))
- .append(
- "c_array",
- Arrays.asList(
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt()))
- .append("c_string", randomString())
- .append("c_boolean", RANDOM.nextBoolean())
- .append("c_int", i)
- .append("c_bigint", RANDOM.nextLong())
- .append("c_double", RANDOM.nextDouble() *
Double.MAX_VALUE)
- .append(
- "c_row",
- new Document(
- "c_map",
- new Document("OQBqH",
randomString())
- .append("rkvlO",
randomString())
- .append("pCMEX",
randomString())
- .append("DAgdj",
randomString())
- .append("dsJag",
randomString()))
- .append(
- "c_array",
- Arrays.asList(
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt()))
- .append("c_string", randomString())
- .append("c_boolean",
RANDOM.nextBoolean())
- .append("c_int", RANDOM.nextInt())
- .append("c_bigint",
RANDOM.nextLong())
- .append(
- "c_double",
- RANDOM.nextDouble() *
Double.MAX_VALUE)));
+ dataSet.add(generateData(i, RANDOM.nextDouble() *
Double.MAX_VALUE));
}
return dataSet;
}
@@ -195,6 +149,17 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
return dataSet;
}
+ public static List<Document> generateTestDataSetWithPresets(
+ int count, List<Double> doublePresets) {
+ List<Document> dataSet = new ArrayList<>(count);
+
+ for (int i = 0; i < count; i++) {
+ dataSet.add(generateData(i, doublePresets.get(i)));
+ }
+
+ return dataSet;
+ }
+
protected static String randomString() {
int length = RANDOM.nextInt(10) + 1;
StringBuilder sb = new StringBuilder(length);
@@ -205,6 +170,63 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
return sb.toString();
}
+ private static Document generateData(int intPreset, Double doublePreset) {
+ return new Document(
+ "c_map",
+ new Document("OQBqH", randomString())
+ .append("rkvlO", randomString())
+ .append("pCMEX", randomString())
+ .append("DAgdj", randomString())
+ .append("dsJag", randomString()))
+ .append(
+ "c_array",
+ Arrays.asList(
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt()))
+ .append("c_string", randomString())
+ .append("c_boolean", RANDOM.nextBoolean())
+ .append("c_int", intPreset)
+ .append("c_bigint", RANDOM.nextLong())
+ .append("c_double", doublePreset)
+ .append(
+ "c_row",
+ new Document(
+ "c_map",
+ new Document("OQBqH", randomString())
+ .append("rkvlO",
randomString())
+ .append("pCMEX",
randomString())
+ .append("DAgdj",
randomString())
+ .append("dsJag",
randomString()))
+ .append(
+ "c_array",
+ Arrays.asList(
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt()))
+ .append("c_string", randomString())
+ .append("c_boolean", RANDOM.nextBoolean())
+ .append("c_int", RANDOM.nextInt())
+ .append("c_bigint", RANDOM.nextLong())
+ .append("c_double", RANDOM.nextDouble() *
Double.MAX_VALUE));
+ }
+
+ private void prepareInitDataInCollection(String collection, List<Document>
dataSet) {
+ MongoCollection<Document> source =
+ client.getDatabase(MONGODB_DATABASE).getCollection(collection);
+ source.deleteMany(new Document());
+
+ InsertManyResult result = source.insertMany(dataSet);
+
+ if (result.getInsertedIds().size() != dataSet.size()) {
+ throw new IllegalStateException("Insertion count mismatch");
+ }
+ }
+
protected List<Document> readMongodbData(String collection) {
MongoCollection<Document> sinkTable =
client.getDatabase(MONGODB_DATABASE).getCollection(collection);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 8ff1f22329..b289af315f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -221,4 +221,20 @@ public class MongodbIT extends AbstractMongodbIT {
clearDate(MONGODB_TRANSACTION_SINK_TABLE);
clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
}
+
+ @TestTemplate
+ public void testMongodbDoubleValue(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult assertSinkResult =
container.executeJob("/mongodb_double_value.conf");
+ Assertions.assertEquals(0, assertSinkResult.getExitCode(),
assertSinkResult.getStderr());
+
+ Assertions.assertIterableEquals(
+ TEST_DOUBLE_DATASET.stream()
+ .peek(e -> e.remove("_id"))
+ .collect(Collectors.toList()),
+ readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream()
+ .peek(e -> e.remove("_id"))
+ .collect(Collectors.toList()));
+ clearDate(MONGODB_DOUBLE_TABLE_RESULT);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf
new file mode 100644
index 0000000000..0f0c01483a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db"
+ database = "test_db"
+ collection = "test_double_op_db"
+ result_table_name = "mongodb_table"
+ cursor.no-timeout = true
+ fetch.size = 1000
+ max.time-min = 100
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ }
+ }
+ }
+ }
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+ database = "test_db"
+ collection = "test_double_op_db_result"
+ source_table_name = "mongodb_table"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ }
+ }
+ }
+ }
+}