This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8a7fe6fcb [Improve] mongodb connector v2 add source query capability
(#3697)
8a7fe6fcb is described below
commit 8a7fe6fcb6efa31e98c3c40187554f09502fdb45
Author: monster <[email protected]>
AuthorDate: Mon Dec 26 15:15:41 2022 +0800
[Improve] mongodb connector v2 add source query capability (#3697)
* [Improve] mongodb connector v2 add source query capability
---
docs/en/connector-v2/source/MongoDB.md | 6 +
.../seatunnel/mongodb/config/MongodbConfig.java | 62 +++--
.../{MongodbConfig.java => MongodbOption.java} | 39 ++-
.../mongodb/config/MongodbParameters.java | 33 ---
.../seatunnel/mongodb/sink/MongodbSink.java | 13 +-
.../seatunnel/mongodb/sink/MongodbSinkFactory.java | 6 +-
.../seatunnel/mongodb/sink/MongodbSinkWriter.java | 4 +-
.../seatunnel/mongodb/source/MongodbSource.java | 15 +-
.../mongodb/source/MongodbSourceFactory.java | 11 +-
.../mongodb/source/MongodbSourceReader.java | 10 +-
.../e2e/flink/v2/mongodb/MongodbMatchQueryIT.java | 285 +++++++++++++++++++++
.../mongodb_source_matchQuery_and_sink.conf | 66 +++++
12 files changed, 443 insertions(+), 107 deletions(-)
diff --git a/docs/en/connector-v2/source/MongoDB.md
b/docs/en/connector-v2/source/MongoDB.md
index 619ad66aa..8b6325706 100644
--- a/docs/en/connector-v2/source/MongoDB.md
+++ b/docs/en/connector-v2/source/MongoDB.md
@@ -22,6 +22,7 @@ Read data from MongoDB.
| uri | string | yes | - |
| database | string | yes | - |
| collection | string | yes | - |
+| matchQuery | string | no | - |
| schema | object | yes | - |
| common-options | config | no | - |
@@ -37,6 +38,10 @@ MongoDB database
MongoDB collection
+### matchQuery [string]
+
+MatchQuery is a JSON string that specifies the selection criteria using query
operators for the documents to be returned from the collection.
+
### schema [object]
#### fields [Config]
@@ -66,6 +71,7 @@ mongodb {
uri =
"mongodb://username:[email protected]:27017/mypost?retryWrites=true&writeConcern=majority"
database = "mydatabase"
collection = "mycollection"
+ matchQuery = "{"id":3}"
schema {
fields {
id = int
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index d38f5134b..058848b2d 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -17,38 +17,50 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Builder;
+import lombok.Getter;
import java.io.Serializable;
+
+
/**
* The config of mongodb
*/
+@Builder
+@Getter
public class MongodbConfig implements Serializable {
- public static final Option<String> URI =
- Options.key("uri")
- .stringType()
- .noDefaultValue()
- .withDescription("MongoDB uri");
-
- public static final Option<String> DATABASE =
- Options.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription("MongoDB database name");
-
- public static final Option<String> COLLECTION =
- Options.key("collection")
- .stringType()
- .noDefaultValue()
- .withDescription("MongoDB collection");
-
- // Don't use now
- public static final String FORMAT = "format";
-
- // Don't use now
- public static final String DEFAULT_FORMAT = "json";
+ @Builder.Default
+ private String uri = URI.defaultValue();
+ @Builder.Default
+ private String database = DATABASE.defaultValue();
+ @Builder.Default
+ private String collection = COLLECTION.defaultValue();
+ @Builder.Default
+ private String matchQuery = MATCHQUERY.defaultValue();
+ public static MongodbConfig buildWithConfig(Config config) {
+ MongodbConfigBuilder builder = MongodbConfig.builder();
+ if (config.hasPath(URI.key())) {
+ builder.uri(config.getString(URI.key()));
+ }
+ if (config.hasPath(DATABASE.key())) {
+ builder.database(config.getString(DATABASE.key()));
+ }
+ if (config.hasPath(COLLECTION.key())) {
+ builder.collection(config.getString(COLLECTION.key()));
+ }
+ if (config.hasPath(MATCHQUERY.key())) {
+ builder.matchQuery(config.getString(MATCHQUERY.key()));
+ }
+ return builder.build();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbOption.java
similarity index 59%
copy from
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
copy to
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbOption.java
index d38f5134b..9b7b5de68 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbOption.java
@@ -20,35 +20,34 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import java.io.Serializable;
-
-/**
- * The config of mongodb
- */
-public class MongodbConfig implements Serializable {
-
+public class MongodbOption {
public static final Option<String> URI =
- Options.key("uri")
- .stringType()
- .noDefaultValue()
- .withDescription("MongoDB uri");
+ Options.key("uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MongoDB uri");
public static final Option<String> DATABASE =
- Options.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription("MongoDB database name");
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MongoDB database name");
public static final Option<String> COLLECTION =
- Options.key("collection")
- .stringType()
- .noDefaultValue()
- .withDescription("MongoDB collection");
+ Options.key("collection")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MongoDB collection");
+
+ public static final Option<String> MATCHQUERY =
+ Options.key("matchQuery")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MatchQuery is a JSON string that
specifies the selection criteria using query operators for the documents to be
returned from the collection.\n");
// Don't use now
public static final String FORMAT = "format";
// Don't use now
public static final String DEFAULT_FORMAT = "json";
-
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
deleted file mode 100644
index 713d5421e..000000000
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class MongodbParameters implements Serializable {
-
- private String uri;
-
- private String database;
-
- private String collection;
-
-}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index f8e69f6dc..ffb5cc8db 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -34,11 +34,10 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
import com.google.auto.service.AutoService;
@@ -49,7 +48,7 @@ public class MongodbSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
private SeaTunnelRowType rowType;
- private MongodbParameters params;
+ private MongodbConfig params;
@Override
public String getPluginName() {
@@ -65,7 +64,7 @@ public class MongodbSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
getPluginName(), PluginType.SINK, result.getMsg()));
}
- this.params = ConfigBeanFactory.create(config,
MongodbParameters.class);
+ this.params = MongodbConfig.buildWithConfig(config);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index af1c365e9..14dfa7056 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
index c18f47207..55218299c 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
@@ -49,7 +49,7 @@ public class MongodbSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
public MongodbSinkWriter(SeaTunnelRowType rowType,
boolean useSimpleTextSchema,
- MongodbParameters params) {
+ MongodbConfig params) {
this.rowType = rowType;
this.database = params.getDatabase();
this.collection = params.getCollection();
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index 545069b87..6abf26e7d 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -35,11 +35,10 @@ import
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
import com.google.auto.service.AutoService;
@@ -48,7 +47,7 @@ public class MongodbSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
private SeaTunnelRowType rowType;
- private MongodbParameters params;
+ private MongodbConfig params;
@Override
public String getPluginName() {
@@ -63,9 +62,7 @@ public class MongodbSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
-
- this.params = ConfigBeanFactory.create(config,
MongodbParameters.class);
-
+ this.params = MongodbConfig.buildWithConfig(config);
if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
this.rowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
index 77449bbdd..d8c399d59 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -37,6 +38,8 @@ public class MongodbSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(URI, DATABASE, COLLECTION,
SeaTunnelSchema.SCHEMA).build();
+ return OptionRule.builder()
+ .required(URI, DATABASE, COLLECTION, SeaTunnelSchema.SCHEMA)
+ .optional(MATCHQUERY).build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
index 3391205d8..208a44510 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Deserializer;
@@ -32,10 +32,12 @@ import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import lombok.extern.slf4j.Slf4j;
+import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.io.IOException;
+import java.util.Optional;
@Slf4j
public class MongodbSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
@@ -44,7 +46,7 @@ public class MongodbSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow>
private MongoClient client;
- private final MongodbParameters params;
+ private final MongodbConfig params;
private final Deserializer deserializer;
@@ -53,7 +55,7 @@ public class MongodbSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow>
private final boolean useSimpleTextSchema;
MongodbSourceReader(SingleSplitReaderContext context,
- MongodbParameters params,
+ MongodbConfig params,
SeaTunnelRowType rowType,
boolean useSimpleTextSchema) {
this.context = context;
@@ -86,7 +88,7 @@ public class MongodbSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow>
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try (MongoCursor<Document> mongoCursor =
client.getDatabase(params.getDatabase())
.getCollection(params.getCollection())
- .find()
+ .find(Optional.ofNullable(params.getMatchQuery()).isPresent() ?
BsonDocument.parse(params.getMatchQuery()) : new BsonDocument())
.projection(projectionFields)
.iterator()) {
while (mongoCursor.hasNext()) {
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
new file mode 100644
index 000000000..099efae75
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Sorts;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class MongodbMatchQueryIT extends FlinkContainer {
+
+ private static final String MONGODB_IMAGE = "mongo:latest";
+ private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
+ private static final int MONGODB_PORT = 27017;
+ private static final String MONGODB_DATABASE = "test_db";
+ private static final String MONGODB_SOURCE_TABLE =
"source_matchQuery_table";
+ private static final String MONGODB_SINK_TABLE = "sink_matchQuery_table";
+
+ private static final List<Document> TEST_DATASET = generateTestDataSet();
+ private static final List<Document> RESULT_DATASET =
generateResultDataSet();
+
+ private GenericContainer<?> mongodbContainer;
+ private MongoClient client;
+
+ @BeforeEach
+ public void startMongoContainer() {
+ DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+ mongodbContainer = new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MONGODB_CONTAINER_HOST)
+ .withExposedPorts(MONGODB_PORT)
+ .waitingFor(new HttpWaitStrategy()
+ .forPort(MONGODB_PORT)
+ .forStatusCodeMatching(response -> response == HTTP_OK ||
response == HTTP_UNAUTHORIZED)
+ .withStartupTimeout(Duration.ofMinutes(2)))
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
+ Startables.deepStart(Stream.of(mongodbContainer)).join();
+ log.info("Mongodb container started");
+
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+ this.initSourceData();
+ }
+
+ @Test
+ public void testMongodb() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertIterableEquals(
+ RESULT_DATASET.stream()
+ .map(e -> {
+ e.remove("_id");
+ return e;
+ })
+ .collect(Collectors.toList()),
+ readSinkData().stream()
+ .map(e -> {
+ e.remove("_id");
+ return e;
+ })
+ .collect(Collectors.toList()));
+ }
+
+ public void initConnection() {
+ String host = mongodbContainer.getContainerIpAddress();
+ int port = mongodbContainer.getFirstMappedPort();
+ String url = String.format("mongodb://%s:%d/%s", host, port,
MONGODB_DATABASE);
+
+ client = MongoClients.create(url);
+ }
+
+ private void initSourceData() {
+ MongoCollection<Document> sourceTable = client
+ .getDatabase(MONGODB_DATABASE)
+ .getCollection(MONGODB_SOURCE_TABLE);
+
+ sourceTable.deleteMany(new Document());
+ sourceTable.insertMany(TEST_DATASET);
+ }
+
+ private List<Document> readSinkData() {
+ MongoCollection<Document> sinkTable = client
+ .getDatabase(MONGODB_DATABASE)
+ .getCollection(MONGODB_SINK_TABLE);
+ MongoCursor<Document> cursor = sinkTable.find()
+ .sort(Sorts.ascending("id"))
+ .cursor();
+ List<Document> documents = new ArrayList<>();
+ while (cursor.hasNext()) {
+ documents.add(cursor.next());
+ }
+ return documents;
+ }
+
+ private static List<Document> generateTestDataSet() {
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ }
+ );
+ Serializer serializer = new DefaultSerializer(seatunnelRowType);
+
+ List<Document> documents = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ });
+ documents.add(serializer.serialize(row));
+ }
+ return documents;
+ }
+
+ private static List<Document> generateResultDataSet() {
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE
+ }
+ );
+ Serializer serializer = new DefaultSerializer(seatunnelRowType);
+
+ List<Document> documents = new ArrayList<>();
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(3),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ });
+ documents.add(serializer.serialize(row));
+ return documents;
+ }
+
+ @AfterEach
+ public void closeMongoContainer() {
+ if (client != null) {
+ client.close();
+ }
+ if (mongodbContainer != null) {
+ mongodbContainer.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
new file mode 100644
index 000000000..2a67c1ea6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ MongoDB {
+ uri =
"mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "source_matchQuery_table"
+ matchQuery = "{"id":3}"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ MongoDB {
+ uri =
"mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "sink_matchQuery_table"
+ }
+}
\ No newline at end of file