This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8a7fe6fcb [Improve] mongodb connector v2 add source query capability 
(#3697)
8a7fe6fcb is described below

commit 8a7fe6fcb6efa31e98c3c40187554f09502fdb45
Author: monster <[email protected]>
AuthorDate: Mon Dec 26 15:15:41 2022 +0800

    [Improve] mongodb connector v2 add source query capability (#3697)
    
    * [Improve] mongodb connector v2 add source query capability
---
 docs/en/connector-v2/source/MongoDB.md             |   6 +
 .../seatunnel/mongodb/config/MongodbConfig.java    |  62 +++--
 .../{MongodbConfig.java => MongodbOption.java}     |  39 ++-
 .../mongodb/config/MongodbParameters.java          |  33 ---
 .../seatunnel/mongodb/sink/MongodbSink.java        |  13 +-
 .../seatunnel/mongodb/sink/MongodbSinkFactory.java |   6 +-
 .../seatunnel/mongodb/sink/MongodbSinkWriter.java  |   4 +-
 .../seatunnel/mongodb/source/MongodbSource.java    |  15 +-
 .../mongodb/source/MongodbSourceFactory.java       |  11 +-
 .../mongodb/source/MongodbSourceReader.java        |  10 +-
 .../e2e/flink/v2/mongodb/MongodbMatchQueryIT.java  | 285 +++++++++++++++++++++
 .../mongodb_source_matchQuery_and_sink.conf        |  66 +++++
 12 files changed, 443 insertions(+), 107 deletions(-)

diff --git a/docs/en/connector-v2/source/MongoDB.md 
b/docs/en/connector-v2/source/MongoDB.md
index 619ad66aa..8b6325706 100644
--- a/docs/en/connector-v2/source/MongoDB.md
+++ b/docs/en/connector-v2/source/MongoDB.md
@@ -22,6 +22,7 @@ Read data from MongoDB.
 | uri            | string | yes      | -             |
 | database       | string | yes      | -             |
 | collection     | string | yes      | -             |
+| matchQuery     | string | no       | -             |
 | schema         | object | yes      | -             |
 | common-options | config | no       | -             |
 
@@ -37,6 +38,10 @@ MongoDB database
 
 MongoDB collection
 
+### matchQuery [string]
+
+MatchQuery is a JSON string that specifies the selection criteria using query 
operators for the documents to be returned from the collection.
+
 ### schema [object]
 
 #### fields [Config]
@@ -66,6 +71,7 @@ mongodb {
     uri = 
"mongodb://username:[email protected]:27017/mypost?retryWrites=true&writeConcern=majority"
     database = "mydatabase"
     collection = "mycollection"
+    matchQuery = "{"id":3}"
     schema {
       fields {
         id = int
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index d38f5134b..058848b2d 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -17,38 +17,50 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Builder;
+import lombok.Getter;
 
 import java.io.Serializable;
 
+
+
 /**
  * The config of mongodb
  */
+@Builder
+@Getter
 public class MongodbConfig implements Serializable {
 
-    public static final Option<String> URI =
-        Options.key("uri")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("MongoDB uri");
-
-    public static final Option<String> DATABASE =
-        Options.key("database")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("MongoDB database name");
-
-    public static final Option<String> COLLECTION =
-        Options.key("collection")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("MongoDB collection");
-
-    // Don't use now
-    public static final String FORMAT = "format";
-
-    // Don't use now
-    public static final String DEFAULT_FORMAT = "json";
+    @Builder.Default
+    private String uri = URI.defaultValue();
+    @Builder.Default
+    private String database = DATABASE.defaultValue();
+    @Builder.Default
+    private String collection = COLLECTION.defaultValue();
+    @Builder.Default
+    private String matchQuery = MATCHQUERY.defaultValue();
+    public static MongodbConfig buildWithConfig(Config config) {
+        MongodbConfigBuilder builder = MongodbConfig.builder();
+        if (config.hasPath(URI.key())) {
+            builder.uri(config.getString(URI.key()));
+        }
+        if (config.hasPath(DATABASE.key())) {
+            builder.database(config.getString(DATABASE.key()));
+        }
+        if (config.hasPath(COLLECTION.key())) {
+            builder.collection(config.getString(COLLECTION.key()));
+        }
+        if (config.hasPath(MATCHQUERY.key())) {
+            builder.matchQuery(config.getString(MATCHQUERY.key()));
+        }
+        return builder.build();
+    }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbOption.java
similarity index 59%
copy from 
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
copy to 
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbOption.java
index d38f5134b..9b7b5de68 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbOption.java
@@ -20,35 +20,34 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import java.io.Serializable;
-
-/**
- * The config of mongodb
- */
-public class MongodbConfig implements Serializable {
-
+public class MongodbOption {
     public static final Option<String> URI =
-        Options.key("uri")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("MongoDB uri");
+            Options.key("uri")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MongoDB uri");
 
     public static final Option<String> DATABASE =
-        Options.key("database")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("MongoDB database name");
+            Options.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MongoDB database name");
 
     public static final Option<String> COLLECTION =
-        Options.key("collection")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("MongoDB collection");
+            Options.key("collection")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MongoDB collection");
+
+    public static final Option<String> MATCHQUERY =
+            Options.key("matchQuery")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MatchQuery is a JSON string that 
specifies the selection criteria using query operators for the documents to be 
returned from the collection.\n");
 
     // Don't use now
     public static final String FORMAT = "format";
 
     // Don't use now
     public static final String DEFAULT_FORMAT = "json";
-
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
deleted file mode 100644
index 713d5421e..000000000
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class MongodbParameters implements Serializable {
-
-    private String uri;
-
-    private String database;
-
-    private String collection;
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index f8e69f6dc..ffb5cc8db 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -34,11 +34,10 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
 
 import com.google.auto.service.AutoService;
 
@@ -49,7 +48,7 @@ public class MongodbSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     private SeaTunnelRowType rowType;
 
-    private MongodbParameters params;
+    private MongodbConfig params;
 
     @Override
     public String getPluginName() {
@@ -65,7 +64,7 @@ public class MongodbSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                     getPluginName(), PluginType.SINK, result.getMsg()));
         }
 
-        this.params = ConfigBeanFactory.create(config, 
MongodbParameters.class);
+        this.params = MongodbConfig.buildWithConfig(config);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index af1c365e9..14dfa7056 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.factory.Factory;
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
index c18f47207..55218299c 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
 
@@ -49,7 +49,7 @@ public class MongodbSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     public MongodbSinkWriter(SeaTunnelRowType rowType,
                              boolean useSimpleTextSchema,
-                             MongodbParameters params) {
+                             MongodbConfig params) {
         this.rowType = rowType;
         this.database = params.getDatabase();
         this.collection = params.getCollection();
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index 545069b87..6abf26e7d 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -35,11 +35,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
 
 import com.google.auto.service.AutoService;
 
@@ -48,7 +47,7 @@ public class MongodbSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     private SeaTunnelRowType rowType;
 
-    private MongodbParameters params;
+    private MongodbConfig params;
 
     @Override
     public String getPluginName() {
@@ -63,9 +62,7 @@ public class MongodbSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
                 String.format("PluginName: %s, PluginType: %s, Message: %s",
                     getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-
-        this.params = ConfigBeanFactory.create(config, 
MongodbParameters.class);
-
+        this.params = MongodbConfig.buildWithConfig(config);
         if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
             Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
             this.rowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
index 77449bbdd..d8c399d59 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
@@ -17,9 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -37,6 +38,8 @@ public class MongodbSourceFactory implements 
TableSourceFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, COLLECTION, 
SeaTunnelSchema.SCHEMA).build();
+        return OptionRule.builder()
+                .required(URI, DATABASE, COLLECTION, SeaTunnelSchema.SCHEMA)
+                .optional(MATCHQUERY).build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
index 3391205d8..208a44510 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultDeserializer;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Deserializer;
 
@@ -32,10 +32,12 @@ import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.Projections;
 import lombok.extern.slf4j.Slf4j;
+import org.bson.BsonDocument;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
 import java.io.IOException;
+import java.util.Optional;
 
 @Slf4j
 public class MongodbSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
@@ -44,7 +46,7 @@ public class MongodbSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow>
 
     private MongoClient client;
 
-    private final MongodbParameters params;
+    private final MongodbConfig params;
 
     private final Deserializer deserializer;
 
@@ -53,7 +55,7 @@ public class MongodbSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow>
     private final boolean useSimpleTextSchema;
 
     MongodbSourceReader(SingleSplitReaderContext context,
-                        MongodbParameters params,
+                        MongodbConfig params,
                         SeaTunnelRowType rowType,
                         boolean useSimpleTextSchema) {
         this.context = context;
@@ -86,7 +88,7 @@ public class MongodbSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow>
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         try (MongoCursor<Document> mongoCursor = 
client.getDatabase(params.getDatabase())
             .getCollection(params.getCollection())
-            .find()
+            .find(Optional.ofNullable(params.getMatchQuery()).isPresent() ? 
BsonDocument.parse(params.getMatchQuery()) : new BsonDocument())
             .projection(projectionFields)
             .iterator()) {
             while (mongoCursor.hasNext()) {
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
new file mode 100644
index 000000000..099efae75
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Sorts;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class MongodbMatchQueryIT extends FlinkContainer {
+
+    private static final String MONGODB_IMAGE = "mongo:latest";
+    private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
+    private static final int MONGODB_PORT = 27017;
+    private static final String MONGODB_DATABASE = "test_db";
+    private static final String MONGODB_SOURCE_TABLE = 
"source_matchQuery_table";
+    private static final String MONGODB_SINK_TABLE = "sink_matchQuery_table";
+
+    private static final List<Document> TEST_DATASET = generateTestDataSet();
+    private static final List<Document> RESULT_DATASET = 
generateResultDataSet();
+
+    private GenericContainer<?> mongodbContainer;
+    private MongoClient client;
+
+    @BeforeEach
+    public void startMongoContainer() {
+        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+        mongodbContainer = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MONGODB_CONTAINER_HOST)
+            .withExposedPorts(MONGODB_PORT)
+            .waitingFor(new HttpWaitStrategy()
+                .forPort(MONGODB_PORT)
+                .forStatusCodeMatching(response -> response == HTTP_OK || 
response == HTTP_UNAUTHORIZED)
+                .withStartupTimeout(Duration.ofMinutes(2)))
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
+        Startables.deepStart(Stream.of(mongodbContainer)).join();
+        log.info("Mongodb container started");
+
+        Awaitility.given().ignoreExceptions()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+        this.initSourceData();
+    }
+
+    @Test
+    public void testMongodb() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertIterableEquals(
+            RESULT_DATASET.stream()
+                .map(e -> {
+                    e.remove("_id");
+                    return e;
+                })
+                .collect(Collectors.toList()),
+            readSinkData().stream()
+                .map(e -> {
+                    e.remove("_id");
+                    return e;
+                })
+                .collect(Collectors.toList()));
+    }
+
+    public void initConnection() {
+        String host = mongodbContainer.getContainerIpAddress();
+        int port = mongodbContainer.getFirstMappedPort();
+        String url = String.format("mongodb://%s:%d/%s", host, port, 
MONGODB_DATABASE);
+
+        client = MongoClients.create(url);
+    }
+
+    private void initSourceData() {
+        MongoCollection<Document> sourceTable = client
+            .getDatabase(MONGODB_DATABASE)
+            .getCollection(MONGODB_SOURCE_TABLE);
+
+        sourceTable.deleteMany(new Document());
+        sourceTable.insertMany(TEST_DATASET);
+    }
+
+    private List<Document> readSinkData() {
+        MongoCollection<Document> sinkTable = client
+            .getDatabase(MONGODB_DATABASE)
+            .getCollection(MONGODB_SINK_TABLE);
+        MongoCursor<Document> cursor = sinkTable.find()
+            .sort(Sorts.ascending("id"))
+            .cursor();
+        List<Document> documents = new ArrayList<>();
+        while (cursor.hasNext()) {
+            documents.add(cursor.next());
+        }
+        return documents;
+    }
+
+    private static List<Document> generateTestDataSet() {
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+            new String[]{
+                "id",
+                "c_map",
+                "c_array",
+                "c_string",
+                "c_boolean",
+                "c_tinyint",
+                "c_smallint",
+                "c_int",
+                "c_bigint",
+                "c_float",
+                "c_double",
+                "c_decimal",
+                "c_bytes",
+                "c_date"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE,
+                new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                ArrayType.BYTE_ARRAY_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.BOOLEAN_TYPE,
+                BasicType.BYTE_TYPE,
+                BasicType.SHORT_TYPE,
+                BasicType.INT_TYPE,
+                BasicType.LONG_TYPE,
+                BasicType.FLOAT_TYPE,
+                BasicType.DOUBLE_TYPE,
+                new DecimalType(2, 1),
+                PrimitiveByteArrayType.INSTANCE,
+                LocalTimeType.LOCAL_DATE_TYPE,
+            }
+        );
+        Serializer serializer = new DefaultSerializer(seatunnelRowType);
+
+        List<Document> documents = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                new Object[]{
+                    Long.valueOf(i),
+                    Collections.singletonMap("key", Short.parseShort("1")),
+                    new Byte[]{Byte.parseByte("1")},
+                    "string",
+                    Boolean.FALSE,
+                    Byte.parseByte("1"),
+                    Short.parseShort("1"),
+                    Integer.parseInt("1"),
+                    Long.parseLong("1"),
+                    Float.parseFloat("1.1"),
+                    Double.parseDouble("1.1"),
+                    BigDecimal.valueOf(11, 1),
+                    "test".getBytes(),
+                    LocalDate.now(),
+                });
+            documents.add(serializer.serialize(row));
+        }
+        return documents;
+    }
+
+    private static List<Document> generateResultDataSet() {
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+            new String[]{
+                "id",
+                "c_map",
+                "c_array",
+                "c_string",
+                "c_boolean",
+                "c_tinyint",
+                "c_smallint",
+                "c_int",
+                "c_bigint",
+                "c_float",
+                "c_double",
+                "c_decimal",
+                "c_bytes",
+                "c_date"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE,
+                new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                ArrayType.BYTE_ARRAY_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.BOOLEAN_TYPE,
+                BasicType.BYTE_TYPE,
+                BasicType.SHORT_TYPE,
+                BasicType.INT_TYPE,
+                BasicType.LONG_TYPE,
+                BasicType.FLOAT_TYPE,
+                BasicType.DOUBLE_TYPE,
+                new DecimalType(2, 1),
+                PrimitiveByteArrayType.INSTANCE,
+                LocalTimeType.LOCAL_DATE_TYPE
+            }
+        );
+        Serializer serializer = new DefaultSerializer(seatunnelRowType);
+
+        List<Document> documents = new ArrayList<>();
+        SeaTunnelRow row = new SeaTunnelRow(
+            new Object[]{
+                Long.valueOf(3),
+                Collections.singletonMap("key", Short.parseShort("1")),
+                new Byte[]{Byte.parseByte("1")},
+                "string",
+                Boolean.FALSE,
+                Byte.parseByte("1"),
+                Short.parseShort("1"),
+                Integer.parseInt("1"),
+                Long.parseLong("1"),
+                Float.parseFloat("1.1"),
+                Double.parseDouble("1.1"),
+                BigDecimal.valueOf(11, 1),
+                "test".getBytes(),
+                LocalDate.now(),
+            });
+        documents.add(serializer.serialize(row));
+        return documents;
+    }
+
+    @AfterEach
+    public void closeMongoContainer() {
+        if (client != null) {
+            client.close();
+        }
+        if (mongodbContainer != null) {
+            mongodbContainer.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
new file mode 100644
index 000000000..2a67c1ea6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  MongoDB {
+    uri = 
"mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    database = "test_db"
+    collection = "source_matchQuery_table"
+    matchQuery = "{"id":3}"
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  MongoDB {
+    uri = 
"mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    database = "test_db"
+    collection = "sink_matchQuery_table"
+  }
+}
\ No newline at end of file


Reply via email to