slinkydeveloper commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r755230667



##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/source/MongodbDynamicTableSource.java
##########
@@ -0,0 +1,63 @@
+package org.apache.flink.mongodb.table.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+
+@Internal
+public class MongodbDynamicTableSource implements ScanTableSource {
+    private final MongodbSinkConf mongodbSinkConf;
+    private final ResolvedSchema tableSchema;

Review comment:
       Please pass through the source and sink, whenever is possible, directly 
the `DataType` provided by `tableSchema.toSourceRowDataType()`, more than 
passing `ResolvedSchema` directly. In this case, you probably need 
`toPhysicalDataType` more than `toSourceDataType`, as suggested below by 
@Airblader.

##########
File path: 
flink-connectors/flink-connector-mongodb/src/test/java/org/apache/flink/mongodb/streaming/utils/ListSink.java
##########
@@ -0,0 +1,29 @@
+package org.apache.flink.mongodb.streaming.utils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Sink that collects elements into a collection.
+ **/
+public class ListSink<IN> implements SinkFunction<IN> {
+
+    private static List<Object> elements = Lists.newArrayList();

Review comment:
       Please use `new ArrayList<>()`

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <exclusions>

Review comment:
       Why? Can you just align the slf4j with the version used in flink?

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>

Review comment:
       ?

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>

Review comment:
       Same as above

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>

Review comment:
       Why? Can you just align the slf4j with the version used in flink?

##########
File path: 
flink-connectors/flink-connector-mongodb/src/test/java/org/apache/flink/mongodb/streaming/utils/ListSink.java
##########
@@ -0,0 +1,29 @@
+package org.apache.flink.mongodb.streaming.utils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Sink that collects elements into a collection.
+ **/
+public class ListSink<IN> implements SinkFunction<IN> {

Review comment:
       More than creating this custom class, can you use 
`org.apache.flink.streaming.util.StreamCollector`?

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>

Review comment:
       You don't need the scala suffix

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>

Review comment:
       Same as above

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>

Review comment:
       Why you need this dependency? Isn't it useful only for testing?

##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-connectors</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.15-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-mongodb</artifactId>
+    <name>Flink : Connectors : Kafka</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+               <mongo.version>4.2.2</mongo.version>
+               <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>slf4j-api</artifactId>
+                                       <groupId>org.slf4j</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>

Review comment:
       You don't need the planner dependency for production, only for testing

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/sink/MongodbDynamicTableSink.java
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.mongodb.table.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.types.DataType;
+
+@Internal
+public class MongodbDynamicTableSink implements DynamicTableSink {
+    private final MongodbSinkConf mongodbSinkConf;
+    private final ResolvedSchema tableSchema;
+
+    public MongodbDynamicTableSink(MongodbSinkConf mongodbSinkConf, 
ResolvedSchema tableSchema) {
+        this.mongodbSinkConf = mongodbSinkConf;
+        this.tableSchema = tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataType dataType = tableSchema.toSinkRowDataType();
+        DataStructureConverter converter = 
context.createDataStructureConverter(dataType);
+        return SinkFunctionProvider.of(new MongodbUpsertSinkFunction(
+                this.mongodbSinkConf,
+                tableSchema.getColumnNames(),

Review comment:
       It's safer to use `DataType.getFieldNames(dataType)` here, since 
`tableSchema.getColumnNames()` might have different column names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to