zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1094408110


##########
flink-connector-mongodb-e2e-tests/src/test/resources/e2e_append_only.sql:
##########
@@ -0,0 +1,44 @@
+--/*
+-- * Licensed to the Apache Software Foundation (ASF) under one
+-- * or more contributor license agreements.  See the NOTICE file
+-- * distributed with this work for additional information
+-- * regarding copyright ownership.  The ASF licenses this file
+-- * to you under the Apache License, Version 2.0 (the
+-- * "License"); you may not use this file except in compliance
+-- * with the License.  You may obtain a copy of the License at
+-- *
+-- *     http://www.apache.org/licenses/LICENSE-2.0
+-- *
+-- * Unless required by applicable law or agreed to in writing, software
+-- * distributed under the License is distributed on an "AS IS" BASIS,
+-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- * See the License for the specific language governing permissions and
+-- * limitations under the License.
+-- */
+
+DROP TABLE IF EXISTS orders;
+DROP TABLE IF EXISTS orders_bak;
+
+CREATE TABLE orders (
+  `_id` STRING,
+  `code` STRING,
+  `quantity` BIGINT,
+  PRIMARY KEY (_id) NOT ENFORCED
+) WITH (
+  'connector' = 'mongodb',
+  'uri' = 'mongodb://mongodb:27017',

Review Comment:
   Can we use a random port for mongodb? This would reduce the chances of this 
test failing locally when by chance something else is currently running on that 
port.



##########
flink-connector-mongodb-e2e-tests/pom.xml:
##########
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <artifactId>flink-connector-mongodb-parent</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.0-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>flink-connector-mongodb-e2e-tests</artifactId>
+       <name>Flink : E2E Tests : MongoDB</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Use fat jar so we don't need to create a user-jar. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-sql-connector-mongodb</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-mongodb</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mongodb</groupId>
+                       <artifactId>mongodb-driver-sync</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>mongodb</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <profiles>
+               <profile>
+                       <id>run-end-to-end-tests</id>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-surefire-plugin</artifactId>
+                                               <executions>
+                                                       <execution>
+                                                               
<id>end-to-end-tests</id>
+                                                               
<phase>integration-test</phase>
+                                                               <goals>
+                                                                       
<goal>test</goal>
+                                                               </goals>
+                                                               <configuration>
+                                                                       
<includes>
+                                                                               
<include>**/*.*</include>
+                                                                       
</includes>
+                                                                       
<systemPropertyVariables>
+                                                                               
<moduleDir>${project.basedir}</moduleDir>
+                                                                       
</systemPropertyVariables>
+                                                               </configuration>
+                                                       </execution>
+                                               </executions>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+       </profiles>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-deploy-plugin</artifactId>
+                               <configuration>
+                                       <skip>true</skip>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>default-test</id>
+                                               <phase>none</phase>
+                                       </execution>
+                                       <execution>
+                                               <id>integration-tests</id>
+                                               <phase>none</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-dependency-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>copy</id>
+                                               
<phase>pre-integration-test</phase>
+                                               <goals>
+                                                       <goal>copy</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactItems>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.flink</groupId>
+                                                                       
<artifactId>flink-sql-connector-mongodb</artifactId>
+                                                                       
<version>${project.version}</version>
+                                                                       
<destFileName>sql-mongodb.jar</destFileName>
+                                                                       
<type>jar</type>
+                                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                                               </artifactItem>
+                                                       </artifactItems>
+                                               </configuration>
+                                       </execution>
+                                       <execution>
+                                               
<id>store-classpath-in-target-for-tests</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       
<goal>build-classpath</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<outputFile>${project.build.directory}/hadoop.classpath</outputFile>
+                                                       
<excludeGroupIds>org.apache.flink</excludeGroupIds>
+                                               </configuration>
+                                       </execution>

Review Comment:
   This appears unused.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoValidationUtils.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.table.MongoKeyExtractor;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.bson.BsonType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+/** Utility methods for validating MongoDB properties. */
+@Internal
+public class MongoValidationUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoValidationUtils.class);
+
+    public static final Set<LogicalTypeRoot> ALLOWED_PRIMARY_KEY_TYPES =
+            EnumSet.of(
+                    LogicalTypeRoot.CHAR,
+                    LogicalTypeRoot.VARCHAR,
+                    LogicalTypeRoot.BOOLEAN,
+                    LogicalTypeRoot.DECIMAL,
+                    LogicalTypeRoot.INTEGER,
+                    LogicalTypeRoot.BIGINT,
+                    LogicalTypeRoot.DOUBLE,
+                    LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                    LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                    LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+                    LogicalTypeRoot.INTERVAL_DAY_TIME);
+
+    private static final Set<LogicalTypeRoot> DENIED_PRIMARY_KEY_TYPES =
+            EnumSet.of(
+                    LogicalTypeRoot.BINARY,
+                    LogicalTypeRoot.VARBINARY,
+                    LogicalTypeRoot.TINYINT,
+                    LogicalTypeRoot.SMALLINT,
+                    LogicalTypeRoot.FLOAT,
+                    LogicalTypeRoot.DATE,
+                    LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+                    LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+                    LogicalTypeRoot.ARRAY,
+                    LogicalTypeRoot.MULTISET,
+                    LogicalTypeRoot.MAP,
+                    LogicalTypeRoot.ROW,
+                    LogicalTypeRoot.DISTINCT_TYPE,
+                    LogicalTypeRoot.STRUCTURED_TYPE,
+                    LogicalTypeRoot.NULL,
+                    LogicalTypeRoot.RAW,
+                    LogicalTypeRoot.SYMBOL,
+                    LogicalTypeRoot.UNRESOLVED);
+
+    /**
+     * Checks that the table does not have a primary key defined on illegal 
types. In MongoDB the
+     * primary key is used to calculate the MongoDB document _id, which may be 
of any {@link
+     * BsonType} other than a {@link BsonType#ARRAY}. Its value must be unique 
and immutable in the
+     * collection.
+     *
+     * <p>MongoDB creates a unique index on the _id field during the creation 
of a collection. There
+     * are also some constraints on the primary key index. For more detailed 
introduction, you can
+     * refer to <a
+     * 
href="https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Index-Key-Limit";>
+     * Index Key Limit</a>.
+     *
+     * <ul>
+     *   <li>Before MongoDB 4.2, the total size of an index entry, which can 
include structural
+     *       overhead depending on the BSON type, must be less than 1024 bytes.
+     *   <li>Starting in version 4.2, MongoDB removes the Index Key Limit.
+     * </ul>
+     *
+     * <p>As of now it is extracted by {@link MongoKeyExtractor} according to 
the primary key
+     * specified by the Flink table schema.
+     *
+     * <ul>
+     *   <li>When there's only a single field in the specified primary key, we 
convert the field
+     *       data to bson value as _id of the corresponding document.
+     *   <li>When there's multiple fields in the specified primary key, we 
convert and composite
+     *       these fields into a {@link BsonType#DOCUMENT} as the _id of the 
corresponding document.
+     *       For example, if have a primary key statement <code>PRIMARY KEY 
(f1, f2) NOT ENFORCED
+     *       </code>, the extracted _id will be the form like <code>_id: {f1: 
v1, f2: v2}</code>
+     * </ul>
+     *
+     * <p>The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} 
types and {@link
+     * LogicalTypeRoot#RAW} type and other types that cannot be converted to 
{@link BsonType} by
+     * {@link RowDataToBsonConverters}.
+     */
+    public static void validatePrimaryKey(DataType primaryKeyDataType) {
+        List<DataType> fieldDataTypes = 
DataType.getFieldDataTypes(primaryKeyDataType);
+        List<DataType> illegalTypes = new ArrayList<>();
+        for (DataType fieldType : fieldDataTypes) {
+            LogicalTypeRoot typeRoot = 
fieldType.getLogicalType().getTypeRoot();
+            if (!ALLOWED_PRIMARY_KEY_TYPES.contains(typeRoot)) {
+                illegalTypes.add(fieldType);
+                if (!DENIED_PRIMARY_KEY_TYPES.contains(typeRoot)) {
+                    LOG.warn(
+                            "Detected newly added root type {} that should to 
be explicitly accepted or rejected.",

Review Comment:
   Maybe include something like "Please reach out to the Flink maintainers."



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoScanSplitAssigner.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitters;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+
+import com.mongodb.MongoNamespace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** The split assigner for {@link MongoScanSourceSplit}. */
+@Internal
+public class MongoScanSplitAssigner implements MongoSplitAssigner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoScanSplitAssigner.class);
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoReadOptions readOptions;
+
+    private final LinkedList<String> remainingCollections;
+    private final List<String> alreadyProcessedCollections;
+    private final List<MongoScanSourceSplit> remainingScanSplits;
+    private final Map<String, MongoScanSourceSplit> assignedScanSplits;
+    private boolean initialized;
+
+    private transient MongoSplitters mongoSplitters;
+
+    public MongoScanSplitAssigner(
+            MongoConnectionOptions connectionOptions,
+            MongoReadOptions readOptions,
+            MongoSourceEnumState sourceEnumState) {
+        this.connectionOptions = connectionOptions;
+        this.readOptions = readOptions;
+        this.remainingCollections = new 
LinkedList<>(sourceEnumState.getRemainingCollections());

Review Comment:
   It seems odd that this uses a LinkedList to somewhat achieve queue semantics 
while remainingSplits is some opaque list.
   As a whole I'm wondering if these really should be lists or not a `deque`.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java:
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.bson.BsonDocument;
+import org.bson.BsonType;
+import org.bson.BsonValue;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.DEFAULT_JSON_WRITER_SETTINGS;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */
+@Internal
+public class BsonToRowDataConverters {
+
+    // 
-------------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
-------------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts {@link BsonValue} into objects of Flink 
Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface BsonToRowDataConverter extends Serializable {
+        Object convert(BsonValue bsonValue);
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate 
Bson for
+    // sql-connector uber jars.
+    // 
--------------------------------------------------------------------------------
+
+    public static BsonToRowDataConverter createConverter(LogicalType type) {

Review Comment:
   This should only accept `RowType`.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java:
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.bson.BsonDocument;
+import org.bson.BsonType;
+import org.bson.BsonValue;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.DEFAULT_JSON_WRITER_SETTINGS;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */
+@Internal
+public class BsonToRowDataConverters {
+
+    // 
-------------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
-------------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts {@link BsonValue} into objects of Flink 
Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface BsonToRowDataConverter extends Serializable {

Review Comment:
   This interface is problematic. It exists for 2 distinct use-cases but mixes 
the 2 and pays for it with loss of type safety.
   
   There is 1 use-case outside of this class where want to convert a 
`BsonValue` to a `RowData`, as the name implies. Currently this always requires 
an unchecked cast.
   
   But it's also used internally to extract arbitrary objects from a BsonValue, 
which could just as well be a `SerializableFunction<BsonValue, Object>`.
   
   It's just not really a `RowData` convert, but an arbitrary mapping between 
Table-supported types and BsonValues.
   



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.json.JsonParseException;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD;
+
+/** Tool class used to convert from {@link RowData} to {@link BsonValue}. */
+@Internal
+public class RowDataToBsonConverters {
+
+    // 
--------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
--------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts objects of Flink Table & SQL internal 
data structures to
+     * corresponding {@link BsonValue} data structures.
+     */
+    @FunctionalInterface
+    public interface RowDataToBsonConverter extends Serializable {
+        BsonValue convert(Object value);
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate 
Bson for
+    // sql-connector uber jars.
+    // 
--------------------------------------------------------------------------------
+
+    public static RowDataToBsonConverter createConverter(LogicalType type) {
+        return 
wrapIntoNullSafeInternalConverter(createInternalConverter(type), type);
+    }
+
+    private static RowDataToBsonConverter wrapIntoNullSafeInternalConverter(
+            RowDataToBsonConverter rowDataToBsonConverter, LogicalType type) {
+        return new RowDataToBsonConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public BsonValue convert(Object value) {
+                if (value == null || 
LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
+                    if (type.isNullable()) {
+                        return BsonNull.VALUE;
+                    } else {
+                        throw new IllegalArgumentException(
+                                "The column type is <"
+                                        + type
+                                        + ">, but a null value is being 
written into it");
+                    }
+                } else {
+                    return rowDataToBsonConverter.convert(value);
+                }
+            }
+        };
+    }
+
+    private static RowDataToBsonConverter createInternalConverter(LogicalType 
type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new RowDataToBsonConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue convert(Object value) {
+                        return null;

Review Comment:
   Why doesn't this return BsonNull.VALUE?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.json.JsonParseException;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD;
+
+/** Tool class used to convert from {@link RowData} to {@link BsonValue}. */
+@Internal
+public class RowDataToBsonConverters {
+
+    // 
--------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
--------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts objects of Flink Table & SQL internal 
data structures to
+     * corresponding {@link BsonValue} data structures.
+     */
+    @FunctionalInterface
+    public interface RowDataToBsonConverter extends Serializable {
+        BsonValue convert(Object value);
+    }

Review Comment:
   Similar concerns as for BsonToRowDataConverters.



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
+import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MongoWriter}. */
+@Testcontainers
+public class MongoWriterITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoWriterITCase.class);
+
+    private static final String TEST_DATABASE = "test_writer";
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .build());
+
+    @Container
+    private static final MongoDBContainer MONGO_CONTAINER =
+            MongoTestUtil.createMongoDBContainer(LOG);
+
+    private static MongoClient mongoClient;
+    private static MetricListener metricListener;
+
+    @BeforeAll
+    static void beforeAll() {
+        mongoClient = 
MongoClients.create(MONGO_CONTAINER.getConnectionString());
+    }
+
+    @AfterAll
+    static void afterAll() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @BeforeEach
+    void setUp() {
+        metricListener = new MetricListener();
+    }
+
+    @Test
+    void testWriteOnBulkFlush() throws Exception {
+        final String collection = "test-bulk-flush-without-checkpoint";
+        final boolean flushOnCheckpoint = false;
+        final int batchSize = 5;
+        final int batchIntervalMs = -1;
+
+        try (final MongoWriter<Document> writer =
+                createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
+            writer.write(buildMessage(1), null);
+            writer.write(buildMessage(2), null);
+            writer.write(buildMessage(3), null);
+            writer.write(buildMessage(4), null);
+
+            // Ignore flush on checkpoint
+            writer.flush(false);
+
+            assertThatIdsAreNotWritten(collectionOf(collection), 1, 2, 3, 4);
+
+            // Trigger flush
+            writer.write(buildMessage(5), null);
+            assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+
+            writer.write(buildMessage(6), null);
+            assertThatIdsAreNotWritten(collectionOf(collection), 6);
+
+            // Force flush
+            writer.doBulkWrite();
+            assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5, 
6);
+        }
+    }
+
+    @Test
+    void testWriteOnBatchIntervalFlush() throws Exception {
+        final String collection = "test-bulk-flush-with-interval";
+        final boolean flushOnCheckpoint = false;
+        final int batchSize = -1;
+        final int batchIntervalMs = 1000;
+
+        try (final MongoWriter<Document> writer =
+                createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
+            writer.write(buildMessage(1), null);
+            writer.write(buildMessage(2), null);
+            writer.write(buildMessage(3), null);
+            writer.write(buildMessage(4), null);
+            writer.doBulkWrite();
+        }
+
+        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4);
+    }
+
+    @Test
+    void testWriteOnCheckpoint() throws Exception {
+        final String collection = "test-bulk-flush-with-checkpoint";
+        final boolean flushOnCheckpoint = true;
+        final int batchSize = -1;
+        final int batchIntervalMs = -1;
+
+        // Enable flush on checkpoint
+        try (final MongoWriter<Document> writer =
+                createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
+            writer.write(buildMessage(1), null);
+            writer.write(buildMessage(2), null);
+            writer.write(buildMessage(3), null);
+
+            assertThatIdsAreNotWritten(collectionOf(collection), 1, 2, 3);
+
+            // Trigger flush
+            writer.flush(false);
+
+            assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3);
+        }
+    }
+
+    @Test
+    void testIncrementRecordsSendMetric() throws Exception {
+        final String collection = "test-inc-records-send";
+        final boolean flushOnCheckpoint = false;
+        final int batchSize = 2;
+        final int batchIntervalMs = -1;
+
+        try (final MongoWriter<Document> writer =
+                createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
+            final Optional<Counter> recordsSend =
+                    metricListener.getCounter(MetricNames.NUM_RECORDS_SEND);
+            writer.write(buildMessage(1), null);
+            // Update existing index
+            writer.write(buildMessage(2, "u"), null);
+            // Delete index
+            writer.write(buildMessage(3, "d"), null);
+
+            writer.doBulkWrite();
+
+            assertThat(recordsSend.isPresent()).isTrue();
+            assertThat(recordsSend.get().getCount()).isEqualTo(3L);
+        }
+    }
+
+    @Test
+    void testCurrentSendTime() throws Exception {
+        final String collection = "test-current-send-time";
+        final boolean flushOnCheckpoint = false;
+        final int batchSize = 2;
+        final int batchIntervalMs = -1;
+
+        try (final MongoWriter<Document> writer =
+                createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
+            final Optional<Gauge<Long>> currentSendTime =
+                    metricListener.getGauge("currentSendTime");
+
+            writer.write(buildMessage(1), null);
+            writer.write(buildMessage(2), null);
+
+            writer.doBulkWrite();
+
+            assertThat(currentSendTime.isPresent()).isTrue();
+            assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);

Review Comment:
   This is basically guaranteed to fail at some point because we're dealing 
with such small time measurements.



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java:
##########
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Filters;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDbPointer;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonJavaScript;
+import org.bson.BsonJavaScriptWithScope;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonString;
+import org.bson.BsonSymbol;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.bson.types.Binary;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.table.api.Expressions.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT tests for {@link MongoDynamicTableSink}. */
+@Testcontainers
+public class MongoDynamicTableSinkITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);
+
+    @Container
+    private static final MongoDBContainer MONGO_CONTAINER =
+            MongoTestUtil.createMongoDBContainer(LOG);
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    private MongoClient mongoClient;
+
+    @BeforeEach
+    public void setUp() {
+        mongoClient = 
MongoClients.create(MONGO_CONTAINER.getConnectionString());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @Test
+    public void testSinkWithAllSupportedTypes() throws ExecutionException, 
InterruptedException {
+        String database = "test";
+        String collection = "sink_with_all_supported_types";
+
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE TABLE mongo_sink",
+                                "(",
+                                "  _id BIGINT,",
+                                "  f1 STRING,",
+                                "  f2 BOOLEAN,",
+                                "  f3 BINARY,",
+                                "  f4 INTEGER,",
+                                "  f5 TIMESTAMP_LTZ(6),",
+                                "  f6 TIMESTAMP(3),",
+                                "  f7 DOUBLE,",
+                                "  f8 DECIMAL(10, 2),",
+                                "  f9 MAP<STRING, INTEGER>,",
+                                "  f10 ROW<k INTEGER>,",
+                                "  f11 ARRAY<STRING>,",
+                                "  f12 ARRAY<ROW<k STRING>>,",
+                                "  PRIMARY KEY (_id) NOT ENFORCED",
+                                ") WITH (",
+                                getConnectorSql(database, collection),
+                                ")")));
+
+        Instant now = Instant.now();
+        tEnv.fromValues(
+                        DataTypes.ROW(
+                                DataTypes.FIELD("_id", DataTypes.BIGINT()),
+                                DataTypes.FIELD("f1", DataTypes.STRING()),
+                                DataTypes.FIELD("f2", DataTypes.BOOLEAN()),
+                                DataTypes.FIELD("f3", DataTypes.BINARY(1)),
+                                DataTypes.FIELD("f4", DataTypes.INT()),
+                                DataTypes.FIELD("f5", 
DataTypes.TIMESTAMP_LTZ(6)),
+                                DataTypes.FIELD("f6", DataTypes.TIMESTAMP(3)),
+                                DataTypes.FIELD("f7", DataTypes.DOUBLE()),
+                                DataTypes.FIELD("f8", DataTypes.DECIMAL(10, 
2)),
+                                DataTypes.FIELD(
+                                        "f9", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())),
+                                DataTypes.FIELD(
+                                        "f10",
+                                        DataTypes.ROW(DataTypes.FIELD("k", 
DataTypes.INT()))),
+                                DataTypes.FIELD("f11", 
DataTypes.ARRAY(DataTypes.STRING())),
+                                DataTypes.FIELD(
+                                        "f12",
+                                        DataTypes.ARRAY(
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "K", 
DataTypes.STRING()))))),
+                        Row.of(
+                                1L,
+                                "ABCDE",
+                                true,
+                                new byte[] {(byte) 3},
+                                6,
+                                now,
+                                Timestamp.from(now),
+                                10.10d,
+                                new BigDecimal("11.11"),
+                                Collections.singletonMap("k", 12),
+                                Row.of(13),
+                                Arrays.asList("14_1", "14_2"),
+                                Arrays.asList(Row.of("15_1"), Row.of("15_2"))))
+                .executeInsert("mongo_sink")
+                .await();
+
+        MongoCollection<Document> coll =
+                mongoClient.getDatabase(database).getCollection(collection);
+
+        Document actual = coll.find(Filters.eq("_id", 1L)).first();
+
+        Document expected =
+                new Document("_id", 1L)
+                        .append("f1", "ABCDE")
+                        .append("f2", true)
+                        .append("f3", new Binary(new byte[] {(byte) 3}))
+                        .append("f4", 6)
+                        .append("f5", Date.from(now))
+                        .append("f6", Date.from(now))
+                        .append("f7", 10.10d)
+                        .append("f8", new Decimal128(new BigDecimal("11.11")))
+                        .append("f9", new Document("k", 12))
+                        .append("f10", new Document("k", 13))
+                        .append("f11", Arrays.asList("14_1", "14_2"))
+                        .append(
+                                "f12",
+                                Arrays.asList(
+                                        new Document("k", "15_1"), new 
Document("k", "15_2")));
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testRoundTripReadAndSink() throws ExecutionException, 
InterruptedException {
+        String database = "test";
+        String sourceCollection = "test_round_trip_source";
+        String sinkCollection = "test_round_trip_sink";
+
+        BsonDocument testData =
+                new BsonDocument("f1", new BsonString("ABCDE"))
+                        .append("f2", new BsonBoolean(true))
+                        .append("f3", new BsonBinary(new byte[] {(byte) 3}))
+                        .append("f4", new BsonInt32(32))
+                        .append("f5", new BsonInt64(64L))
+                        .append("f6", new BsonDouble(128.128d))
+                        .append("f7", new BsonDecimal128(new Decimal128(new 
BigDecimal("256.256"))))
+                        .append("f8", new 
BsonDateTime(Instant.now().toEpochMilli()))
+                        .append("f9", new BsonTimestamp((int) 
Instant.now().getEpochSecond(), 100))
+                        .append(
+                                "f10",
+                                new 
BsonRegularExpression(Pattern.compile("^9$").pattern(), "i"))
+                        .append("f11", new BsonJavaScript("function() { return 
10; }"))
+                        .append(
+                                "f12",
+                                new BsonJavaScriptWithScope(
+                                        "function() { return 11; }", new 
BsonDocument()))
+                        .append("f13", new BsonDbPointer("test.test", new 
ObjectId()))
+                        .append("f14", new BsonSymbol("symbol"))
+                        .append(
+                                "f15",
+                                new BsonArray(Arrays.asList(new BsonInt32(1), 
new BsonInt32(2))))
+                        .append("f16", new BsonDocument("k", new 
BsonInt32(32)));
+
+        MongoCollection<BsonDocument> sourceColl =
+                mongoClient
+                        .getDatabase(database)
+                        .getCollection(sourceCollection)
+                        .withDocumentClass(BsonDocument.class);
+        sourceColl.insertOne(testData);
+
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE mongo_source (\n"
+                                + "`_id` STRING,\n"
+                                + "`f1` STRING,\n"
+                                + "`f2` BOOLEAN,\n"
+                                + "`f3` BINARY,\n"
+                                + "`f4` INTEGER,\n"
+                                + "`f5` BIGINT,\n"
+                                + "`f6` DOUBLE,\n"
+                                + "`f7` DECIMAL(10, 3),\n"
+                                + "`f8` TIMESTAMP_LTZ(3),\n"
+                                + "`f9` STRING,\n"
+                                + "`f10` STRING,\n"
+                                + "`f11` STRING,\n"
+                                + "`f12` STRING,\n"
+                                + "`f13` STRING,\n"
+                                + "`f14` STRING,\n"
+                                + "`f15` ARRAY<INTEGER>,\n"
+                                + "`f16` ROW<k INTEGER>,\n"
+                                + " PRIMARY KEY (_id) NOT ENFORCED\n"
+                                + ") WITH ( %s )",
+                        getConnectorSql(database, sourceCollection)));
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE mongo_sink WITH ( %s ) LIKE 
mongo_source",
+                        getConnectorSql(database, sinkCollection)));
+
+        tEnv.executeSql("insert into mongo_sink select * from 
mongo_source").await();
+
+        MongoCollection<BsonDocument> sinkColl =
+                mongoClient
+                        .getDatabase(database)
+                        .getCollection(sinkCollection)
+                        .withDocumentClass(BsonDocument.class);
+
+        BsonDocument actual = sinkColl.find().first();
+
+        assertThat(actual).isEqualTo(testData);
+    }
+
+    @Test
+    public void testSinkWithAllRowKind() throws ExecutionException, 
InterruptedException {
+        String database = "test";
+        String collection = "test_sink_with_all_row_kind";
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        DataStream<Row> sourceStream =
+                env.fromCollection(
+                                Arrays.asList(
+                                        Row.ofKind(RowKind.INSERT, 1L, 
"Alice"),
+                                        Row.ofKind(RowKind.DELETE, 1L, 
"Alice"),
+                                        Row.ofKind(RowKind.INSERT, 2L, "Bob"),
+                                        Row.ofKind(RowKind.UPDATE_BEFORE, 2L, 
"Bob"),
+                                        Row.ofKind(RowKind.UPDATE_AFTER, 2L, 
"Tom")))
+                        .returns(
+                                new RowTypeInfo(
+                                        new TypeInformation[] {Types.LONG, 
Types.STRING},
+                                        new String[] {"id", "name"}));
+
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .build();
+
+        Table sourceTable = tEnv.fromChangelogStream(sourceStream, 
sourceSchema);
+        tEnv.createTemporaryView("value_source", sourceTable);
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE mongo_sink (\n"
+                                + "`_id` BIGINT,\n"
+                                + "`name` STRING,\n"
+                                + " PRIMARY KEY (_id) NOT ENFORCED\n"
+                                + ") WITH ( %s )",
+                        getConnectorSql(database, collection)));
+
+        tEnv.executeSql("insert into mongo_sink select * from 
value_source").await();
+
+        MongoCollection<Document> coll =
+                mongoClient.getDatabase(database).getCollection(collection);
+
+        List<Document> expected =
+                Collections.singletonList(new Document("_id", 
2L).append("name", "Tom"));
+
+        List<Document> actual = coll.find().into(new ArrayList<>());
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testSinkWithReservedId() throws Exception {
+        String database = "test";
+        String collection = "sink_with_reserved_id";
+
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE mongo_sink ("
+                                + "_id STRING NOT NULL,\n"
+                                + "f1 STRING NOT NULL,\n"
+                                + "PRIMARY KEY (_id) NOT ENFORCED\n"
+                                + ")\n"
+                                + "WITH (%s)",
+                        getConnectorSql(database, collection)));
+
+        ObjectId objectId = new ObjectId();
+        tEnv.fromValues(row(objectId.toHexString(), "r1"), row("str", "r2"))
+                .executeInsert("mongo_sink")
+                .await();
+
+        MongoCollection<Document> coll =
+                mongoClient.getDatabase(database).getCollection(collection);
+
+        List<Document> actual = new ArrayList<>();
+        coll.find(Filters.in("_id", objectId, "str")).into(actual);
+
+        Document[] expected =
+                new Document[] {
+                    new Document("_id", objectId).append("f1", "r1"),
+                    new Document("_id", "str").append("f1", "r2")
+                };
+        assertThat(actual).containsExactlyInAnyOrder(expected);
+    }
+
+    @Test
+    public void testSinkWithoutPrimaryKey() throws Exception {
+        String database = "test";
+        String collection = "sink_without_primary_key";
+
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE mongo_sink (" + "f1 STRING NOT NULL\n" + 
")\n" + "WITH (%s)",
+                        getConnectorSql(database, collection)));
+
+        tEnv.fromValues(row("d1"), 
row("d1")).executeInsert("mongo_sink").await();
+
+        MongoCollection<Document> coll =
+                mongoClient.getDatabase(database).getCollection(collection);
+
+        List<Document> actual = new ArrayList<>();
+        coll.find().into(actual);
+
+        assertThat(actual).hasSize(2);
+        for (Document doc : actual) {
+            assertThat(doc.get("f1")).isEqualTo("d1");
+        }
+    }
+
+    @Test
+    public void testSinkWithNonCompositePrimaryKey() throws Exception {
+        String database = "test";
+        String collection = "sink_with_non_composite_pk";
+
+        Instant now = Instant.now();
+        List<Expression> testValues =
+                Collections.singletonList(
+                        row(1L, true, "ABCDE", 12.12d, 4, Timestamp.from(now), 
now));

Review Comment:
   ```suggestion
                           row(2L, true, "ABCDE", 12.12d, 4, 
Timestamp.from(now), now));
   ```
   Let's use some value that doesnt happen to coincide with the generated(?) 
`_id`.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoScanSplitAssigner.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitters;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+
+import com.mongodb.MongoNamespace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** The split assigner for {@link MongoScanSourceSplit}. */
+@Internal
+public class MongoScanSplitAssigner implements MongoSplitAssigner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoScanSplitAssigner.class);
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoReadOptions readOptions;
+
+    private final LinkedList<String> remainingCollections;
+    private final List<String> alreadyProcessedCollections;
+    private final List<MongoScanSourceSplit> remainingScanSplits;
+    private final Map<String, MongoScanSourceSplit> assignedScanSplits;
+    private boolean initialized;
+
+    private transient MongoSplitters mongoSplitters;
+
+    public MongoScanSplitAssigner(
+            MongoConnectionOptions connectionOptions,
+            MongoReadOptions readOptions,
+            MongoSourceEnumState sourceEnumState) {
+        this.connectionOptions = connectionOptions;
+        this.readOptions = readOptions;
+        this.remainingCollections = new 
LinkedList<>(sourceEnumState.getRemainingCollections());
+        this.alreadyProcessedCollections = 
sourceEnumState.getAlreadyProcessedCollections();
+        this.remainingScanSplits = sourceEnumState.getRemainingScanSplits();
+        this.assignedScanSplits = sourceEnumState.getAssignedScanSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public void open() {
+        LOG.info("Mongo scan split assigner is opening.");
+        if (!initialized) {
+            String collectionId =
+                    String.format(
+                            "%s.%s",
+                            connectionOptions.getDatabase(), 
connectionOptions.getCollection());
+            remainingCollections.add(collectionId);
+            mongoSplitters = new MongoSplitters(connectionOptions, 
readOptions);
+            initialized = true;
+        }
+    }
+
+    @Override
+    public Optional<MongoSourceSplit> getNext() {
+        if (!remainingScanSplits.isEmpty()) {
+            // return remaining splits firstly
+            Iterator<MongoScanSourceSplit> iterator = 
remainingScanSplits.iterator();
+            MongoScanSourceSplit split = iterator.next();
+            iterator.remove();
+            assignedScanSplits.put(split.splitId(), split);
+            return Optional.of(split);
+        } else {
+            // it's turn for next collection
+            String nextCollection = remainingCollections.pollFirst();
+            if (nextCollection != null) {
+                // split the given collection into chunks (scan splits)
+                Collection<MongoScanSourceSplit> splits =
+                        mongoSplitters.split(new 
MongoNamespace(nextCollection));
+                remainingScanSplits.addAll(splits);
+                alreadyProcessedCollections.add(nextCollection);
+                return getNext();
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    @Override
+    public void addSplitsBack(Collection<MongoSourceSplit> splits) {
+        for (MongoSourceSplit split : splits) {
+            if (split instanceof MongoScanSourceSplit) {
+                remainingScanSplits.add((MongoScanSourceSplit) split);
+                // we should remove the add-backed splits from the assigned 
list,
+                // because they are failed
+                assignedScanSplits.remove(split.splitId());
+            }
+        }
+    }
+
+    @Override
+    public MongoSourceEnumState snapshotState(long checkpointId) {
+        return new MongoSourceEnumState(
+                remainingCollections,
+                alreadyProcessedCollections,
+                remainingScanSplits,
+                assignedScanSplits,
+                initialized);
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        return initialized && remainingCollections.isEmpty() && 
remainingScanSplits.isEmpty();
+    }

Review Comment:
   I'd rather add a Precondition that initialized is true. We don't want this 
to be called in any other situation, and as-is we might mask such a problem.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoSplitAssigner.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+
+/** The split assigner for {@link MongoSourceSplit}. */
+@Internal
+public interface MongoSplitAssigner extends Serializable {
+
+    /**
+     * Called to open the assigner to acquire any resources, like threads or 
network connections.
+     */
+    void open();
+
+    /**
+     * Called to close the assigner, in case it holds on to any resources, 
like threads or network
+     * connections.
+     */
+    void close() throws IOException;
+
+    /** Gets the next split. */
+    Optional<MongoSourceSplit> getNext();

Review Comment:
   Document that. Specifically it's relationship to `noMoreSplits()`.
   



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/serialization/MongoRowDataDeserializationSchema.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.serialization;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
+import 
org.apache.flink.connector.mongodb.table.converter.BsonToRowDataConverters;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.bson.BsonDocument;
+
+/** Deserializer that {@link BsonDocument} to flink internal {@link RowData}. 
*/

Review Comment:
   ```suggestion
   /** Deserializer that maps {@link BsonDocument} to {@link RowData}. */
   ```



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.common.utils.MongoUtils;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+/** To split collections of MongoDB to {@link MongoSourceSplit}s. */
+@Internal
+public class MongoSplitters implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoSplitters.class);
+
+    private final MongoReadOptions readOptions;
+    private final MongoClient mongoClient;
+
+    public MongoSplitters(MongoConnectionOptions connectionOptions, 
MongoReadOptions readOptions) {
+        this.readOptions = readOptions;
+        this.mongoClient = MongoClients.create(connectionOptions.getUri());
+    }
+
+    public Collection<MongoScanSourceSplit> split(MongoNamespace namespace) {
+        BsonDocument collStats;
+        try {
+            collStats = MongoUtils.collStats(mongoClient, namespace);
+        } catch (MongoException e) {
+            LOG.error("Execute collStats command failed, with error message: 
{}", e.getMessage());
+            throw new FlinkRuntimeException(e);
+        }
+
+        MongoSplitContext splitContext =
+                MongoSplitContext.of(readOptions, mongoClient, namespace, 
collStats);
+
+        switch (readOptions.getPartitionStrategy()) {
+            case SINGLE:
+                return MongoSingleSplitter.INSTANCE.split(splitContext);

Review Comment:
   These could all be static methods; the singleton pattern doesn't really 
gives as any benefit.



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.testutils;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+
+/** Sharded Containers. */
+public class MongoShardedContainers implements BeforeAllCallback, 
AfterAllCallback {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoShardedContainers.class);
+
+    private static final int MONGODB_INTERNAL_PORT = 27017;
+
+    private static final String CONFIG_REPLICA_SET_NAME = "rs-config-0";
+    private static final String SHARD_REPLICA_SET_NAME = "rs-shard-0";
+
+    private static final String CONFIG_HOSTNAME = "config-0";
+    private static final String SHARD_HOSTNAME = "shard-0";
+    private static final String ROUTER_HOSTNAME = "router-0";
+
+    private final MongoDBContainer configSrv;
+    private final MongoDBContainer shardSrv;
+    private final MongoDBContainer router;
+
+    MongoShardedContainers(DockerImageName dockerImageName, Network network) {
+        Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG);
+        this.configSrv =
+                new MongoDBContainer(dockerImageName)
+                        .withCreateContainerCmdModifier(it -> 
it.withHostName(CONFIG_HOSTNAME))
+                        .withCommand(
+                                "-configsvr",
+                                "--replSet",
+                                CONFIG_REPLICA_SET_NAME,
+                                "--port",
+                                String.valueOf(MONGODB_INTERNAL_PORT))
+                        .withNetwork(network)
+                        .withNetworkAliases(CONFIG_HOSTNAME)
+                        .withLogConsumer(logConsumer);
+        this.shardSrv =
+                new MongoDBContainer(dockerImageName)
+                        .withCreateContainerCmdModifier(it -> 
it.withHostName(SHARD_HOSTNAME))
+                        .withCommand(
+                                "-shardsvr",
+                                "--replSet",
+                                SHARD_REPLICA_SET_NAME,
+                                "--port",
+                                String.valueOf(MONGODB_INTERNAL_PORT))
+                        .withNetwork(network)
+                        .withNetworkAliases(SHARD_HOSTNAME)
+                        .withLogConsumer(logConsumer);
+        this.router =
+                new MongoRouterContainer(dockerImageName)
+                        .withCreateContainerCmdModifier(it -> 
it.withHostName(ROUTER_HOSTNAME))
+                        .dependsOn(configSrv, shardSrv)
+                        .withNetwork(network)
+                        .withNetworkAliases(ROUTER_HOSTNAME)
+                        .withLogConsumer(logConsumer);
+    }
+
+    public void start() {
+        LOG.info("Starting ConfigSrv container");
+        configSrv.start();
+        LOG.info("Starting ShardSrv container");
+        shardSrv.start();
+        LOG.info("Starting Router containers");
+        router.start();
+    }
+
+    public void close() {
+        router.stop();
+        shardSrv.stop();
+        configSrv.stop();
+    }

Review Comment:
   Why are these public? can we just inline them into the callback methods?



##########
flink-connector-mongodb/src/test/resources/log4j2-test.properties:
##########
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO

Review Comment:
   ```suggestion
   rootLogger.level = OFF
   ```



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.json.JsonParseException;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD;
+
+/** Tool class used to convert from {@link RowData} to {@link BsonValue}. */
+@Internal
+public class RowDataToBsonConverters {
+
+    // 
--------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
--------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts objects of Flink Table & SQL internal 
data structures to
+     * corresponding {@link BsonValue} data structures.
+     */
+    @FunctionalInterface
+    public interface RowDataToBsonConverter extends Serializable {
+        BsonValue convert(Object value);
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate 
Bson for
+    // sql-connector uber jars.
+    // 
--------------------------------------------------------------------------------
+
+    public static RowDataToBsonConverter createConverter(LogicalType type) {

Review Comment:
   Should only accept RowData



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+
+import java.time.Duration;
+
+/**
+ * Base options for the MongoDB connector. Needs to be public so that the 
{@link
+ * org.apache.flink.table.api.TableDescriptor} can access it.
+ */
+@PublicEvolving
+public class MongoConnectorOptions {
+
+    private MongoConnectorOptions() {}
+
+    public static final ConfigOption<String> URI =
+            ConfigOptions.key("uri")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Specifies the connection uri of 
MongoDB.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Specifies the database to read or write 
of MongoDB.");
+
+    public static final ConfigOption<String> COLLECTION =
+            ConfigOptions.key("collection")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Specifies the collection to read or 
write of MongoDB.");
+
+    public static final ConfigOption<Integer> SCAN_FETCH_SIZE =
+            ConfigOptions.key("scan.fetch-size")
+                    .intType()
+                    .defaultValue(2048)
+                    .withDescription(
+                            "Gives the reader a hint as to the number of 
documents that should be fetched from the database per round-trip when reading. 
");
+
+    public static final ConfigOption<Integer> SCAN_CURSOR_BATCH_SIZE =

Review Comment:
   This isn't enough information for users, and current descriptions are highly 
misleading.
   
   scan size states "the number of documents that should be fetched from the 
database", but it's rather the number of records _consumed_ in one fetch() call 
from one or more retrieved batches.
   But this is a highly technical description that users won't know what to do 
with.
   
   
   We just have to expand a bit on the trade-offs.
   batch size reduces the # of calls to MongoDB at the cost of higher memory 
consumption in Flink and higher risk of throwing away fetched data (when 
fetch-size is significantly lower and a failure occurs), while fetch size is 
all about keeping the source responsive to checkpointing.
   
   I do wonder though whether fetch-size should be necessary. To me it really 
points to a problem on how `fetch()` is implemented / designed.
   What would be the overhead of just existing fetch() every 10 records?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.utils.MongoValidationUtils;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.bson.BsonObjectId;
+import org.bson.BsonValue;
+import org.bson.types.ObjectId;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An extractor for a MongoDB key from a {@link RowData}. */
+@Internal
+public class MongoKeyExtractor implements SerializableFunction<RowData, 
BsonValue> {
+
+    public static final String RESERVED_ID = ID_FIELD;
+
+    private static final AppendOnlyKeyExtractor APPEND_ONLY_KEY_EXTRACTOR =
+            new AppendOnlyKeyExtractor();
+
+    private final int[] primaryKeyIndexes;
+
+    private final RowDataToBsonConverter primaryKeyConverter;
+
+    private final FieldGetter primaryKeyGetter;
+
+    private MongoKeyExtractor(LogicalType primaryKeyType, int[] 
primaryKeyIndexes) {
+        this.primaryKeyIndexes = primaryKeyIndexes;
+        this.primaryKeyConverter = 
RowDataToBsonConverters.createConverter(primaryKeyType);
+        if (isCompoundPrimaryKey(primaryKeyIndexes)) {
+            this.primaryKeyGetter =
+                    rowData -> 
ProjectedRowData.from(primaryKeyIndexes).replaceRow(rowData);
+        } else {
+            this.primaryKeyGetter = RowData.createFieldGetter(primaryKeyType, 
primaryKeyIndexes[0]);
+        }
+    }
+
+    @Override
+    public BsonValue apply(RowData rowData) {
+        Object rowKeyValue = primaryKeyGetter.getFieldOrNull(rowData);
+        checkNotNull(rowKeyValue, "Primary key value is null of RowData: " + 
rowData);
+        BsonValue keyValue = primaryKeyConverter.convert(rowKeyValue);
+        if (!isCompoundPrimaryKey(primaryKeyIndexes) && keyValue.isString()) {
+            String keyString = keyValue.asString().getValue();
+            // Try to restore MongoDB's ObjectId from string.
+            if (ObjectId.isValid(keyString)) {
+                keyValue = new BsonObjectId(new ObjectId(keyString));
+            }
+        }
+        return keyValue;
+    }
+
+    public static SerializableFunction<RowData, BsonValue> createKeyExtractor(
+            ResolvedSchema resolvedSchema) {
+
+        Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey();
+        int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes();
+        Optional<Column> reservedId = resolvedSchema.getColumn(RESERVED_ID);
+
+        // Primary key is not declared and reserved _id is not present.
+        if (!primaryKey.isPresent() && !reservedId.isPresent()) {
+            return APPEND_ONLY_KEY_EXTRACTOR;
+        }
+
+        if (reservedId.isPresent()) {
+            // Ambiguous keys being used due to the presence of an _id field.
+            if (!primaryKey.isPresent()
+                    || isCompoundPrimaryKey(primaryKeyIndexes)
+                    || !primaryKeyContainsReservedId(primaryKey.get())) {
+                throw new IllegalArgumentException(
+                        "Ambiguous keys being used due to the presence of an 
_id field.");

Review Comment:
   Add instructions for how to resolve that.
   
   For example, "Either use the _id column as the key, or rename the _id 
column".



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.utils.MongoValidationUtils;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.bson.BsonObjectId;
+import org.bson.BsonValue;
+import org.bson.types.ObjectId;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An extractor for a MongoDB key from a {@link RowData}. */
+@Internal
+public class MongoKeyExtractor implements SerializableFunction<RowData, 
BsonValue> {
+
+    public static final String RESERVED_ID = ID_FIELD;
+
+    private static final AppendOnlyKeyExtractor APPEND_ONLY_KEY_EXTRACTOR =
+            new AppendOnlyKeyExtractor();
+
+    private final int[] primaryKeyIndexes;
+
+    private final RowDataToBsonConverter primaryKeyConverter;
+
+    private final FieldGetter primaryKeyGetter;
+
+    private MongoKeyExtractor(LogicalType primaryKeyType, int[] 
primaryKeyIndexes) {
+        this.primaryKeyIndexes = primaryKeyIndexes;
+        this.primaryKeyConverter = 
RowDataToBsonConverters.createConverter(primaryKeyType);
+        if (isCompoundPrimaryKey(primaryKeyIndexes)) {
+            this.primaryKeyGetter =
+                    rowData -> 
ProjectedRowData.from(primaryKeyIndexes).replaceRow(rowData);
+        } else {
+            this.primaryKeyGetter = RowData.createFieldGetter(primaryKeyType, 
primaryKeyIndexes[0]);
+        }
+    }
+
+    @Override
+    public BsonValue apply(RowData rowData) {
+        Object rowKeyValue = primaryKeyGetter.getFieldOrNull(rowData);
+        checkNotNull(rowKeyValue, "Primary key value is null of RowData: " + 
rowData);
+        BsonValue keyValue = primaryKeyConverter.convert(rowKeyValue);
+        if (!isCompoundPrimaryKey(primaryKeyIndexes) && keyValue.isString()) {
+            String keyString = keyValue.asString().getValue();
+            // Try to restore MongoDB's ObjectId from string.
+            if (ObjectId.isValid(keyString)) {
+                keyValue = new BsonObjectId(new ObjectId(keyString));
+            }
+        }
+        return keyValue;
+    }
+
+    public static SerializableFunction<RowData, BsonValue> createKeyExtractor(
+            ResolvedSchema resolvedSchema) {
+
+        Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey();
+        int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes();
+        Optional<Column> reservedId = resolvedSchema.getColumn(RESERVED_ID);
+
+        // Primary key is not declared and reserved _id is not present.
+        if (!primaryKey.isPresent() && !reservedId.isPresent()) {
+            return APPEND_ONLY_KEY_EXTRACTOR;
+        }
+
+        if (reservedId.isPresent()) {
+            // Ambiguous keys being used due to the presence of an _id field.
+            if (!primaryKey.isPresent()
+                    || isCompoundPrimaryKey(primaryKeyIndexes)
+                    || !primaryKeyContainsReservedId(primaryKey.get())) {
+                throw new IllegalArgumentException(
+                        "Ambiguous keys being used due to the presence of an 
_id field.");
+            }
+        }
+
+        DataType primaryKeyType;
+        if (isCompoundPrimaryKey(primaryKeyIndexes)) {
+            DataType physicalRowDataType = 
resolvedSchema.toPhysicalRowDataType();
+            primaryKeyType = 
Projection.of(primaryKeyIndexes).project(physicalRowDataType);
+        } else {
+            int primaryKeyIndex = primaryKeyIndexes[0];
+            Optional<Column> column = 
resolvedSchema.getColumn(primaryKeyIndex);
+            if (!column.isPresent()) {
+                throw new IllegalStateException(
+                        String.format(
+                                "No primary key column found with index 
'%s'.", primaryKeyIndex));
+            }
+            primaryKeyType = column.get().getDataType();
+        }
+
+        MongoValidationUtils.validatePrimaryKey(primaryKeyType);
+
+        return new MongoKeyExtractor(primaryKeyType.getLogicalType(), 
primaryKeyIndexes);
+    }
+
+    private static boolean isCompoundPrimaryKey(int[] primaryKeyIndexes) {
+        return primaryKeyIndexes.length > 1;
+    }
+
+    private static boolean primaryKeyContainsReservedId(UniqueConstraint 
primaryKey) {
+        return primaryKey.getColumns().contains(RESERVED_ID);
+    }
+
+    /**
+     * It behaves as append-only when no primary key is declared and reserved 
_id is not present. We
+     * use static class instead of lambda for a reason here. It is necessary 
because the maven shade

Review Comment:
   ```suggestion
        * use static class instead of lambda because the maven shade
   ```



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.utils.MongoValidationUtils;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.bson.BsonObjectId;
+import org.bson.BsonValue;
+import org.bson.types.ObjectId;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An extractor for a MongoDB key from a {@link RowData}. */
+@Internal
+public class MongoKeyExtractor implements SerializableFunction<RowData, 
BsonValue> {

Review Comment:
   missing SerialVersion UID



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_BATCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SAMPLES;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_STRATEGY;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test for {@link MongoDynamicTableSource} and {@link MongoDynamicTableSink} 
created by {@link
+ * MongoDynamicTableFactory}.
+ */
+public class MongoDynamicTableFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("aaa", DataTypes.INT().notNull()),
+                            Column.physical("bbb", 
DataTypes.STRING().notNull()),
+                            Column.physical("ccc", DataTypes.DOUBLE()),
+                            Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
+                            Column.physical("eee", DataTypes.TIMESTAMP(3))),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("name", Arrays.asList("bbb", 
"aaa")));
+
+    @Test
+    public void testMongoCommonProperties() {
+        Map<String, String> properties = getRequiredOptions();
+
+        // validation for source
+        DynamicTableSource actualSource = createTableSource(SCHEMA, 
properties);
+
+        MongoConnectionOptions connectionOptions =
+                MongoConnectionOptions.builder()
+                        .setUri("mongodb://127.0.0.1:27017")
+                        .setDatabase("test_db")
+                        .setCollection("test_coll")
+                        .build();
+
+        MongoDynamicTableSource expectedSource =
+                new MongoDynamicTableSource(
+                        connectionOptions,
+                        MongoReadOptions.builder().build(),
+                        null,
+                        LookupOptions.MAX_RETRIES.defaultValue(),
+                        LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+                        SCHEMA.toPhysicalRowDataType());
+        assertThat(actualSource).isEqualTo(expectedSource);
+
+        // validation for sink

Review Comment:
   split the test case instead of adding such comments.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.Projections;
+import com.mongodb.client.model.Sorts;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;
+
+/**
+ * Sample Partitioner
+ *
+ * <p>Samples the collection to generate partitions.
+ *
+ * <p>Uses the average document size to split the collection into average 
sized chunks
+ *
+ * <p>The partitioner samples the collection, projects and sorts by the 
partition fields. Then uses
+ * every {@code samplesPerPartition} as the value to use to calculate the 
partition boundaries.
+ *
+ * <ul>
+ *   <li>scan.partition.size: The average size (MB) for each partition. Note: 
Uses the average
+ *       document size to determine the number of documents per partition so 
may not be even.
+ *       Defaults to: 64mb.
+ *   <li>scan.partition.samples: The number of samples to take per partition. 
Defaults to: 10. The
+ *       total number of samples taken is calculated as: {@code samples per 
partition * (count of
+ *       documents / number of documents per partition)}.

Review Comment:
   Why aren't we just doing this?
   
   ```
   // the number of elements we want each split to read
   numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes
   sampleCount = count / numDocumentsPerPartition
   totalSplitCount = sampleCount + 1
   ```
   
   What are multiple sample per split supposed to give us? Is the MongoDB 
sampling not good enough that we need a higher resolution?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/deserializer/MongoJsonDeserializationSchema.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.reader.deserializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.bson.BsonDocument;
+import org.bson.json.JsonMode;
+
+import java.util.Optional;
+
+/**
+ * A schema bridge for deserializing the MongoDB's {@code BsonDocument} to 
MongoDB's {@link
+ * JsonMode}'s RELAXED Json string.
+ */
+@PublicEvolving
+public class MongoJsonDeserializationSchema implements 
MongoDeserializationSchema<String> {

Review Comment:
   > Used to convert BsonDocument to Json format
   
   That's not a use-case, but what it does.
   Who do we expect to use it? Is it only for testing? Production usage?



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_BATCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SAMPLES;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_STRATEGY;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test for {@link MongoDynamicTableSource} and {@link MongoDynamicTableSink} 
created by {@link
+ * MongoDynamicTableFactory}.
+ */
+public class MongoDynamicTableFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("aaa", DataTypes.INT().notNull()),
+                            Column.physical("bbb", 
DataTypes.STRING().notNull()),
+                            Column.physical("ccc", DataTypes.DOUBLE()),
+                            Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
+                            Column.physical("eee", DataTypes.TIMESTAMP(3))),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("name", Arrays.asList("bbb", 
"aaa")));
+
+    @Test
+    public void testMongoCommonProperties() {
+        Map<String, String> properties = getRequiredOptions();
+
+        // validation for source
+        DynamicTableSource actualSource = createTableSource(SCHEMA, 
properties);
+
+        MongoConnectionOptions connectionOptions =
+                MongoConnectionOptions.builder()
+                        .setUri("mongodb://127.0.0.1:27017")
+                        .setDatabase("test_db")
+                        .setCollection("test_coll")
+                        .build();
+
+        MongoDynamicTableSource expectedSource =
+                new MongoDynamicTableSource(
+                        connectionOptions,
+                        MongoReadOptions.builder().build(),
+                        null,
+                        LookupOptions.MAX_RETRIES.defaultValue(),
+                        LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+                        SCHEMA.toPhysicalRowDataType());
+        assertThat(actualSource).isEqualTo(expectedSource);
+
+        // validation for sink
+        DynamicTableSink actualSink = createTableSink(SCHEMA, properties);
+
+        MongoWriteOptions writeOptions = MongoWriteOptions.builder().build();
+        MongoDynamicTableSink expectedSink =
+                new MongoDynamicTableSink(
+                        connectionOptions,
+                        writeOptions,
+                        null,
+                        SCHEMA.getPrimaryKey().isPresent(),
+                        SCHEMA.toPhysicalRowDataType(),
+                        MongoKeyExtractor.createKeyExtractor(SCHEMA));
+        assertThat(actualSink).isEqualTo(expectedSink);
+    }
+
+    @Test
+    public void testMongoReadProperties() {
+        Map<String, String> properties = getRequiredOptions();
+        properties.put(SCAN_FETCH_SIZE.key(), "1024");
+        properties.put(SCAN_CURSOR_BATCH_SIZE.key(), "2048");
+        properties.put(SCAN_CURSOR_NO_TIMEOUT.key(), "false");
+        properties.put(SCAN_PARTITION_STRATEGY.key(), "split-vector");
+        properties.put(SCAN_PARTITION_SIZE.key(), "128m");
+        properties.put(SCAN_PARTITION_SAMPLES.key(), "5");
+
+        DynamicTableSource actual = createTableSource(SCHEMA, properties);
+
+        MongoConnectionOptions connectionOptions =
+                MongoConnectionOptions.builder()
+                        .setUri("mongodb://127.0.0.1:27017")
+                        .setDatabase("test_db")
+                        .setCollection("test_coll")
+                        .build();
+        MongoReadOptions readOptions =
+                MongoReadOptions.builder()
+                        .setFetchSize(1024)
+                        .setCursorBatchSize(2048)
+                        .setNoCursorTimeout(false)
+                        .setPartitionStrategy(PartitionStrategy.SPLIT_VECTOR)
+                        .setPartitionSize(MemorySize.ofMebiBytes(128))
+                        .setSamplesPerPartition(5)
+                        .build();
+
+        MongoDynamicTableSource expected =
+                new MongoDynamicTableSource(
+                        connectionOptions,
+                        readOptions,
+                        null,
+                        LookupOptions.MAX_RETRIES.defaultValue(),
+                        LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+                        SCHEMA.toPhysicalRowDataType());
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testMongoLookupProperties() {
+        Map<String, String> properties = getRequiredOptions();
+        properties.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL");
+        properties.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), 
"10s");
+        properties.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), 
"20s");
+        properties.put(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY.key(), 
"false");
+        properties.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), "15213");
+        properties.put(LookupOptions.MAX_RETRIES.key(), "10");
+        properties.put(LOOKUP_RETRY_INTERVAL.key(), "20ms");
+
+        DynamicTableSource actual = createTableSource(SCHEMA, properties);
+
+        MongoConnectionOptions connectionOptions =
+                MongoConnectionOptions.builder()
+                        .setUri("mongodb://127.0.0.1:27017")
+                        .setDatabase("test_db")
+                        .setCollection("test_coll")
+                        .build();
+
+        MongoDynamicTableSource expected =
+                new MongoDynamicTableSource(
+                        connectionOptions,
+                        MongoReadOptions.builder().build(),
+                        
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
+                        10,
+                        20,
+                        SCHEMA.toPhysicalRowDataType());
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testMongoSinkProperties() {
+        Map<String, String> properties = getRequiredOptions();
+        properties.put(BUFFER_FLUSH_MAX_ROWS.key(), "1001");
+        properties.put(BUFFER_FLUSH_INTERVAL.key(), "2min");
+        properties.put(DELIVERY_GUARANTEE.key(), "at-least-once");
+        properties.put(SINK_MAX_RETRIES.key(), "5");
+        properties.put(SINK_RETRY_INTERVAL.key(), "2s");
+
+        DynamicTableSink actual = createTableSink(SCHEMA, properties);
+
+        MongoConnectionOptions connectionOptions =
+                MongoConnectionOptions.builder()
+                        .setUri("mongodb://127.0.0.1:27017")
+                        .setDatabase("test_db")
+                        .setCollection("test_coll")
+                        .build();
+        MongoWriteOptions writeOptions =
+                MongoWriteOptions.builder()
+                        .setBatchSize(1001)
+                        .setBatchIntervalMs(TimeUnit.MINUTES.toMillis(2))
+                        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+                        .setMaxRetries(5)
+                        .setRetryIntervalMs(TimeUnit.SECONDS.toMillis(2))
+                        .build();
+
+        MongoDynamicTableSink expected =
+                new MongoDynamicTableSink(
+                        connectionOptions,
+                        writeOptions,
+                        null,
+                        SCHEMA.getPrimaryKey().isPresent(),
+                        SCHEMA.toPhysicalRowDataType(),
+                        MongoKeyExtractor.createKeyExtractor(SCHEMA));
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testMongoSinkWithParallelism() {
+        Map<String, String> properties = getRequiredOptions();
+        properties.put("sink.parallelism", "2");
+
+        DynamicTableSink actual = createTableSink(SCHEMA, properties);
+
+        MongoConnectionOptions connectionOptions =
+                MongoConnectionOptions.builder()
+                        .setUri("mongodb://127.0.0.1:27017")
+                        .setDatabase("test_db")
+                        .setCollection("test_coll")
+                        .build();
+
+        MongoWriteOptions writeOptions = MongoWriteOptions.builder().build();
+
+        MongoDynamicTableSink expected =
+                new MongoDynamicTableSink(
+                        connectionOptions,
+                        writeOptions,
+                        2,
+                        SCHEMA.getPrimaryKey().isPresent(),
+                        SCHEMA.toPhysicalRowDataType(),
+                        MongoKeyExtractor.createKeyExtractor(SCHEMA));
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testMongoValidation() {
+        // fetch size lower than 1
+        Map<String, String> properties = getRequiredOptions();
+        properties.put(SCAN_FETCH_SIZE.key(), "0");
+
+        Map<String, String> finalProperties1 = properties;

Review Comment:
   You could make this test a lot nicer with a utility method like 
`getRequiredOptionsWithSetting(String, String)`.
   ```
   assertThatThrownBy(() -> createTableSource(SCHEMA, 
getRequiredOptionsWithSetting(SCAN_FETCH_SIZE.key(), "0"))
           .hasStackTraceContaining("The fetch size must be larger than 0.");
   ```
   
   Bonus points for adding another `assertValidationRejects(String, 
String,String)` method.
   
   ```
   assertValidationRejects(SCAN_FETCH_SIZE.key(), "0", "The fetch size must be 
larger than 0.");
   ```



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.mongodb.MongoTestUtil;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import javax.annotation.Nullable;
+
+import static 
org.apache.flink.connector.mongodb.MongoTestUtil.assertThatIdsAreWritten;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link MongoSink}. */
+@Testcontainers
+public class MongoSinkITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoSinkITCase.class);
+
+    @Container
+    private static final MongoDBContainer MONGO_CONTAINER =
+            MongoTestUtil.createMongoDBContainer(LOG);
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setConfiguration(new Configuration())
+                            .build());
+
+    private static final String TEST_DATABASE = "test_sink";
+
+    private static boolean failed;
+
+    private static MongoClient mongoClient;
+
+    @BeforeAll
+    static void setUp() {
+        failed = false;
+        mongoClient = 
MongoClients.create(MONGO_CONTAINER.getConnectionString());
+    }
+
+    @AfterAll
+    static void tearDown() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(DeliveryGuarantee.class)
+    void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee)
+            throws Exception {
+        final String index = "test-sink-with-delivery-" + deliveryGuarantee;
+        boolean failure = false;
+        try {
+            runTest(index, false, deliveryGuarantee, null);
+        } catch (IllegalArgumentException e) {
+            failure = true;
+            
assertThat(deliveryGuarantee).isSameAs(DeliveryGuarantee.EXACTLY_ONCE);

Review Comment:
   I still think we shouldn't even test exactly_once, apart from it being 
rejected (in a dedicated test).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to