slinkydeveloper commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r755230667
##########
File path:
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/source/MongodbDynamicTableSource.java
##########
@@ -0,0 +1,63 @@
+package org.apache.flink.mongodb.table.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+
+@Internal
+public class MongodbDynamicTableSource implements ScanTableSource {
+ private final MongodbSinkConf mongodbSinkConf;
+ private final ResolvedSchema tableSchema;
Review comment:
Please pass through the source and sink, whenever is possible, directly
the `DataType` provided by `tableSchema.toSourceRowDataType()`, more than
passing `ResolvedSchema` directly. In this case, you probably need
`toPhysicalDataType` more than `toSourceDataType`, as suggested below by
@Airblader.
##########
File path:
flink-connectors/flink-connector-mongodb/src/test/java/org/apache/flink/mongodb/streaming/utils/ListSink.java
##########
@@ -0,0 +1,29 @@
+package org.apache.flink.mongodb.streaming.utils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Sink that collects elements into a collection.
+ **/
+public class ListSink<IN> implements SinkFunction<IN> {
+
+ private static List<Object> elements = Lists.newArrayList();
Review comment:
Please use `new ArrayList<>()`
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
Review comment:
Why? Can you just align the slf4j with the version used in flink?
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
Review comment:
?
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
Review comment:
Same as above
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
Review comment:
Why? Can you just align the slf4j with the version used in flink?
##########
File path:
flink-connectors/flink-connector-mongodb/src/test/java/org/apache/flink/mongodb/streaming/utils/ListSink.java
##########
@@ -0,0 +1,29 @@
+package org.apache.flink.mongodb.streaming.utils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Sink that collects elements into a collection.
+ **/
+public class ListSink<IN> implements SinkFunction<IN> {
Review comment:
More than creating this custom class, can you use
`org.apache.flink.streaming.util.StreamCollector`?
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
Review comment:
You don't need the scala suffix
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
Review comment:
Same as above
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
Review comment:
Why you need this dependency? Isn't it useful only for testing?
##########
File path: flink-connectors/flink-connector-mongodb/pom.xml
##########
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-mongodb</artifactId>
+ <name>Flink : Connectors : Kafka</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mongo.version>4.2.2</mongo.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-core</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${mongo.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
Review comment:
You don't need the planner dependency for production, only for testing
##########
File path:
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/sink/MongodbDynamicTableSink.java
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.mongodb.table.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.types.DataType;
+
+@Internal
+public class MongodbDynamicTableSink implements DynamicTableSink {
+ private final MongodbSinkConf mongodbSinkConf;
+ private final ResolvedSchema tableSchema;
+
+ public MongodbDynamicTableSink(MongodbSinkConf mongodbSinkConf,
ResolvedSchema tableSchema) {
+ this.mongodbSinkConf = mongodbSinkConf;
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ DataType dataType = tableSchema.toSinkRowDataType();
+ DataStructureConverter converter =
context.createDataStructureConverter(dataType);
+ return SinkFunctionProvider.of(new MongodbUpsertSinkFunction(
+ this.mongodbSinkConf,
+ tableSchema.getColumnNames(),
Review comment:
It's safer to use `DataType.getFieldNames(dataType)` here, since
`tableSchema.getColumnNames()` might have different column names.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]