zentol commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1094408110
########## flink-connector-mongodb-e2e-tests/src/test/resources/e2e_append_only.sql: ########## @@ -0,0 +1,44 @@ +--/* +-- * Licensed to the Apache Software Foundation (ASF) under one +-- * or more contributor license agreements. See the NOTICE file +-- * distributed with this work for additional information +-- * regarding copyright ownership. The ASF licenses this file +-- * to you under the Apache License, Version 2.0 (the +-- * "License"); you may not use this file except in compliance +-- * with the License. You may obtain a copy of the License at +-- * +-- * http://www.apache.org/licenses/LICENSE-2.0 +-- * +-- * Unless required by applicable law or agreed to in writing, software +-- * distributed under the License is distributed on an "AS IS" BASIS, +-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- * See the License for the specific language governing permissions and +-- * limitations under the License. +-- */ + +DROP TABLE IF EXISTS orders; +DROP TABLE IF EXISTS orders_bak; + +CREATE TABLE orders ( + `_id` STRING, + `code` STRING, + `quantity` BIGINT, + PRIMARY KEY (_id) NOT ENFORCED +) WITH ( + 'connector' = 'mongodb', + 'uri' = 'mongodb://mongodb:27017', Review Comment: Can we use a random port for mongodb? This would reduce the chances of this test failing locally when by chance something else is currently running on that port. ########## flink-connector-mongodb-e2e-tests/pom.xml: ########## @@ -0,0 +1,163 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-connector-mongodb-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + + <artifactId>flink-connector-mongodb-e2e-tests</artifactId> + <name>Flink : E2E Tests : MongoDB</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + <!-- Use fat jar so we don't need to create a user-jar. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-mongodb</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-mongodb</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongodb-driver-sync</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mongodb</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>run-end-to-end-tests</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>end-to-end-tests</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <includes> + <include>**/*.*</include> + </includes> + <systemPropertyVariables> + <moduleDir>${project.basedir}</moduleDir> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>default-test</id> + <phase>none</phase> + </execution> + <execution> + <id>integration-tests</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-mongodb</artifactId> + <version>${project.version}</version> + <destFileName>sql-mongodb.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </execution> + <execution> + <id>store-classpath-in-target-for-tests</id> + <phase>package</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <outputFile>${project.build.directory}/hadoop.classpath</outputFile> + <excludeGroupIds>org.apache.flink</excludeGroupIds> + </configuration> + </execution> Review Comment: This appears unused. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoValidationUtils.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.common.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.table.MongoKeyExtractor; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import org.bson.BsonType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +/** Utility methods for validating MongoDB properties. */ +@Internal +public class MongoValidationUtils { + + private static final Logger LOG = LoggerFactory.getLogger(MongoValidationUtils.class); + + public static final Set<LogicalTypeRoot> ALLOWED_PRIMARY_KEY_TYPES = + EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + LogicalTypeRoot.INTERVAL_YEAR_MONTH, + LogicalTypeRoot.INTERVAL_DAY_TIME); + + private static final Set<LogicalTypeRoot> DENIED_PRIMARY_KEY_TYPES = + EnumSet.of( + LogicalTypeRoot.BINARY, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, + LogicalTypeRoot.ARRAY, + LogicalTypeRoot.MULTISET, + LogicalTypeRoot.MAP, + LogicalTypeRoot.ROW, + LogicalTypeRoot.DISTINCT_TYPE, + LogicalTypeRoot.STRUCTURED_TYPE, + LogicalTypeRoot.NULL, + LogicalTypeRoot.RAW, + LogicalTypeRoot.SYMBOL, + LogicalTypeRoot.UNRESOLVED); + + /** + * Checks that the table does not have a primary key defined on illegal types. In MongoDB the + * primary key is used to calculate the MongoDB document _id, which may be of any {@link + * BsonType} other than a {@link BsonType#ARRAY}. Its value must be unique and immutable in the + * collection. + * + * <p>MongoDB creates a unique index on the _id field during the creation of a collection. There + * are also some constraints on the primary key index. For more detailed introduction, you can + * refer to <a + * href="https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Index-Key-Limit"> + * Index Key Limit</a>. + * + * <ul> + * <li>Before MongoDB 4.2, the total size of an index entry, which can include structural + * overhead depending on the BSON type, must be less than 1024 bytes. + * <li>Starting in version 4.2, MongoDB removes the Index Key Limit. + * </ul> + * + * <p>As of now it is extracted by {@link MongoKeyExtractor} according to the primary key + * specified by the Flink table schema. + * + * <ul> + * <li>When there's only a single field in the specified primary key, we convert the field + * data to bson value as _id of the corresponding document. + * <li>When there's multiple fields in the specified primary key, we convert and composite + * these fields into a {@link BsonType#DOCUMENT} as the _id of the corresponding document. + * For example, if have a primary key statement <code>PRIMARY KEY (f1, f2) NOT ENFORCED + * </code>, the extracted _id will be the form like <code>_id: {f1: v1, f2: v2}</code> + * </ul> + * + * <p>The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link + * LogicalTypeRoot#RAW} type and other types that cannot be converted to {@link BsonType} by + * {@link RowDataToBsonConverters}. + */ + public static void validatePrimaryKey(DataType primaryKeyDataType) { + List<DataType> fieldDataTypes = DataType.getFieldDataTypes(primaryKeyDataType); + List<DataType> illegalTypes = new ArrayList<>(); + for (DataType fieldType : fieldDataTypes) { + LogicalTypeRoot typeRoot = fieldType.getLogicalType().getTypeRoot(); + if (!ALLOWED_PRIMARY_KEY_TYPES.contains(typeRoot)) { + illegalTypes.add(fieldType); + if (!DENIED_PRIMARY_KEY_TYPES.contains(typeRoot)) { + LOG.warn( + "Detected newly added root type {} that should to be explicitly accepted or rejected.", Review Comment: Maybe include something like "Please reach out to the Flink maintainers." ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoScanSplitAssigner.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState; +import org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitters; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; + +import com.mongodb.MongoNamespace; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** The split assigner for {@link MongoScanSourceSplit}. */ +@Internal +public class MongoScanSplitAssigner implements MongoSplitAssigner { + + private static final Logger LOG = LoggerFactory.getLogger(MongoScanSplitAssigner.class); + + private final MongoConnectionOptions connectionOptions; + private final MongoReadOptions readOptions; + + private final LinkedList<String> remainingCollections; + private final List<String> alreadyProcessedCollections; + private final List<MongoScanSourceSplit> remainingScanSplits; + private final Map<String, MongoScanSourceSplit> assignedScanSplits; + private boolean initialized; + + private transient MongoSplitters mongoSplitters; + + public MongoScanSplitAssigner( + MongoConnectionOptions connectionOptions, + MongoReadOptions readOptions, + MongoSourceEnumState sourceEnumState) { + this.connectionOptions = connectionOptions; + this.readOptions = readOptions; + this.remainingCollections = new LinkedList<>(sourceEnumState.getRemainingCollections()); Review Comment: It seems odd that this uses a LinkedList to somewhat achieve queue semantics while remainingSplits is some opaque list. As a whole I'm wondering if these really should be lists or not a `deque`. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java: ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.converter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.bson.BsonDocument; +import org.bson.BsonType; +import org.bson.BsonValue; +import org.bson.types.Decimal128; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.DEFAULT_JSON_WRITER_SETTINGS; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */ +@Internal +public class BsonToRowDataConverters { + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link BsonValue} into objects of Flink Table & SQL internal + * data structures. + */ + @FunctionalInterface + public interface BsonToRowDataConverter extends Serializable { + Object convert(BsonValue bsonValue); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for + // sql-connector uber jars. + // -------------------------------------------------------------------------------- + + public static BsonToRowDataConverter createConverter(LogicalType type) { Review Comment: This should only accept `RowType`. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java: ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.converter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.bson.BsonDocument; +import org.bson.BsonType; +import org.bson.BsonValue; +import org.bson.types.Decimal128; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.DEFAULT_JSON_WRITER_SETTINGS; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */ +@Internal +public class BsonToRowDataConverters { + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link BsonValue} into objects of Flink Table & SQL internal + * data structures. + */ + @FunctionalInterface + public interface BsonToRowDataConverter extends Serializable { Review Comment: This interface is problematic. It exists for 2 distinct use-cases but mixes the 2 and pays for it with loss of type safety. There is 1 use-case outside of this class where want to convert a `BsonValue` to a `RowData`, as the name implies. Currently this always requires an unchecked cast. But it's also used internally to extract arbitrary objects from a BsonValue, which could just as well be a `SerializableFunction<BsonValue, Object>`. It's just not really a `RowData` convert, but an arbitrary mapping between Table-supported types and BsonValues. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.converter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.bson.BsonArray; +import org.bson.BsonBinary; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonNull; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.json.JsonParseException; +import org.bson.types.Decimal128; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD; + +/** Tool class used to convert from {@link RowData} to {@link BsonValue}. */ +@Internal +public class RowDataToBsonConverters { + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures to + * corresponding {@link BsonValue} data structures. + */ + @FunctionalInterface + public interface RowDataToBsonConverter extends Serializable { + BsonValue convert(Object value); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for + // sql-connector uber jars. + // -------------------------------------------------------------------------------- + + public static RowDataToBsonConverter createConverter(LogicalType type) { + return wrapIntoNullSafeInternalConverter(createInternalConverter(type), type); + } + + private static RowDataToBsonConverter wrapIntoNullSafeInternalConverter( + RowDataToBsonConverter rowDataToBsonConverter, LogicalType type) { + return new RowDataToBsonConverter() { + private static final long serialVersionUID = 1L; + + @Override + public BsonValue convert(Object value) { + if (value == null || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { + if (type.isNullable()) { + return BsonNull.VALUE; + } else { + throw new IllegalArgumentException( + "The column type is <" + + type + + ">, but a null value is being written into it"); + } + } else { + return rowDataToBsonConverter.convert(value); + } + } + }; + } + + private static RowDataToBsonConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return new RowDataToBsonConverter() { + private static final long serialVersionUID = 1L; + + @Override + public BsonValue convert(Object value) { + return null; Review Comment: Why doesn't this return BsonNull.VALUE? ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.converter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.bson.BsonArray; +import org.bson.BsonBinary; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonNull; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.json.JsonParseException; +import org.bson.types.Decimal128; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD; + +/** Tool class used to convert from {@link RowData} to {@link BsonValue}. */ +@Internal +public class RowDataToBsonConverters { + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures to + * corresponding {@link BsonValue} data structures. + */ + @FunctionalInterface + public interface RowDataToBsonConverter extends Serializable { + BsonValue convert(Object value); + } Review Comment: Similar concerns as for BsonToRowDataConverters. ########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java: ########## @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.sink.writer; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; +import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext; +import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema; +import org.apache.flink.connector.mongodb.testutils.MongoTestUtil; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.UserCodeClassLoader; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; +import org.bson.BsonDocument; +import org.bson.Document; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.Optional; +import java.util.OptionalLong; + +import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten; +import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MongoWriter}. */ +@Testcontainers +public class MongoWriterITCase { + + private static final Logger LOG = LoggerFactory.getLogger(MongoWriterITCase.class); + + private static final String TEST_DATABASE = "test_writer"; + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .build()); + + @Container + private static final MongoDBContainer MONGO_CONTAINER = + MongoTestUtil.createMongoDBContainer(LOG); + + private static MongoClient mongoClient; + private static MetricListener metricListener; + + @BeforeAll + static void beforeAll() { + mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString()); + } + + @AfterAll + static void afterAll() { + if (mongoClient != null) { + mongoClient.close(); + } + } + + @BeforeEach + void setUp() { + metricListener = new MetricListener(); + } + + @Test + void testWriteOnBulkFlush() throws Exception { + final String collection = "test-bulk-flush-without-checkpoint"; + final boolean flushOnCheckpoint = false; + final int batchSize = 5; + final int batchIntervalMs = -1; + + try (final MongoWriter<Document> writer = + createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) { + writer.write(buildMessage(1), null); + writer.write(buildMessage(2), null); + writer.write(buildMessage(3), null); + writer.write(buildMessage(4), null); + + // Ignore flush on checkpoint + writer.flush(false); + + assertThatIdsAreNotWritten(collectionOf(collection), 1, 2, 3, 4); + + // Trigger flush + writer.write(buildMessage(5), null); + assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5); + + writer.write(buildMessage(6), null); + assertThatIdsAreNotWritten(collectionOf(collection), 6); + + // Force flush + writer.doBulkWrite(); + assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5, 6); + } + } + + @Test + void testWriteOnBatchIntervalFlush() throws Exception { + final String collection = "test-bulk-flush-with-interval"; + final boolean flushOnCheckpoint = false; + final int batchSize = -1; + final int batchIntervalMs = 1000; + + try (final MongoWriter<Document> writer = + createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) { + writer.write(buildMessage(1), null); + writer.write(buildMessage(2), null); + writer.write(buildMessage(3), null); + writer.write(buildMessage(4), null); + writer.doBulkWrite(); + } + + assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4); + } + + @Test + void testWriteOnCheckpoint() throws Exception { + final String collection = "test-bulk-flush-with-checkpoint"; + final boolean flushOnCheckpoint = true; + final int batchSize = -1; + final int batchIntervalMs = -1; + + // Enable flush on checkpoint + try (final MongoWriter<Document> writer = + createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) { + writer.write(buildMessage(1), null); + writer.write(buildMessage(2), null); + writer.write(buildMessage(3), null); + + assertThatIdsAreNotWritten(collectionOf(collection), 1, 2, 3); + + // Trigger flush + writer.flush(false); + + assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3); + } + } + + @Test + void testIncrementRecordsSendMetric() throws Exception { + final String collection = "test-inc-records-send"; + final boolean flushOnCheckpoint = false; + final int batchSize = 2; + final int batchIntervalMs = -1; + + try (final MongoWriter<Document> writer = + createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) { + final Optional<Counter> recordsSend = + metricListener.getCounter(MetricNames.NUM_RECORDS_SEND); + writer.write(buildMessage(1), null); + // Update existing index + writer.write(buildMessage(2, "u"), null); + // Delete index + writer.write(buildMessage(3, "d"), null); + + writer.doBulkWrite(); + + assertThat(recordsSend.isPresent()).isTrue(); + assertThat(recordsSend.get().getCount()).isEqualTo(3L); + } + } + + @Test + void testCurrentSendTime() throws Exception { + final String collection = "test-current-send-time"; + final boolean flushOnCheckpoint = false; + final int batchSize = 2; + final int batchIntervalMs = -1; + + try (final MongoWriter<Document> writer = + createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) { + final Optional<Gauge<Long>> currentSendTime = + metricListener.getGauge("currentSendTime"); + + writer.write(buildMessage(1), null); + writer.write(buildMessage(2), null); + + writer.doBulkWrite(); + + assertThat(currentSendTime.isPresent()).isTrue(); + assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); Review Comment: This is basically guaranteed to fail at some point because we're dealing with such small time measurements. ########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java: ########## @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.mongodb.testutils.MongoTestUtil; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Filters; +import org.bson.BsonArray; +import org.bson.BsonBinary; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDbPointer; +import org.bson.BsonDecimal128; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonJavaScript; +import org.bson.BsonJavaScriptWithScope; +import org.bson.BsonRegularExpression; +import org.bson.BsonString; +import org.bson.BsonSymbol; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.bson.types.Binary; +import org.bson.types.Decimal128; +import org.bson.types.ObjectId; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; + +import static org.apache.flink.table.api.Expressions.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT tests for {@link MongoDynamicTableSink}. */ +@Testcontainers +public class MongoDynamicTableSinkITCase { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class); + + @Container + private static final MongoDBContainer MONGO_CONTAINER = + MongoTestUtil.createMongoDBContainer(LOG); + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .build()); + + private MongoClient mongoClient; + + @BeforeEach + public void setUp() { + mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString()); + } + + @AfterEach + public void tearDown() { + if (mongoClient != null) { + mongoClient.close(); + } + } + + @Test + public void testSinkWithAllSupportedTypes() throws ExecutionException, InterruptedException { + String database = "test"; + String collection = "sink_with_all_supported_types"; + + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.join( + "\n", + Arrays.asList( + "CREATE TABLE mongo_sink", + "(", + " _id BIGINT,", + " f1 STRING,", + " f2 BOOLEAN,", + " f3 BINARY,", + " f4 INTEGER,", + " f5 TIMESTAMP_LTZ(6),", + " f6 TIMESTAMP(3),", + " f7 DOUBLE,", + " f8 DECIMAL(10, 2),", + " f9 MAP<STRING, INTEGER>,", + " f10 ROW<k INTEGER>,", + " f11 ARRAY<STRING>,", + " f12 ARRAY<ROW<k STRING>>,", + " PRIMARY KEY (_id) NOT ENFORCED", + ") WITH (", + getConnectorSql(database, collection), + ")"))); + + Instant now = Instant.now(); + tEnv.fromValues( + DataTypes.ROW( + DataTypes.FIELD("_id", DataTypes.BIGINT()), + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("f2", DataTypes.BOOLEAN()), + DataTypes.FIELD("f3", DataTypes.BINARY(1)), + DataTypes.FIELD("f4", DataTypes.INT()), + DataTypes.FIELD("f5", DataTypes.TIMESTAMP_LTZ(6)), + DataTypes.FIELD("f6", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("f7", DataTypes.DOUBLE()), + DataTypes.FIELD("f8", DataTypes.DECIMAL(10, 2)), + DataTypes.FIELD( + "f9", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())), + DataTypes.FIELD( + "f10", + DataTypes.ROW(DataTypes.FIELD("k", DataTypes.INT()))), + DataTypes.FIELD("f11", DataTypes.ARRAY(DataTypes.STRING())), + DataTypes.FIELD( + "f12", + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD( + "K", DataTypes.STRING()))))), + Row.of( + 1L, + "ABCDE", + true, + new byte[] {(byte) 3}, + 6, + now, + Timestamp.from(now), + 10.10d, + new BigDecimal("11.11"), + Collections.singletonMap("k", 12), + Row.of(13), + Arrays.asList("14_1", "14_2"), + Arrays.asList(Row.of("15_1"), Row.of("15_2")))) + .executeInsert("mongo_sink") + .await(); + + MongoCollection<Document> coll = + mongoClient.getDatabase(database).getCollection(collection); + + Document actual = coll.find(Filters.eq("_id", 1L)).first(); + + Document expected = + new Document("_id", 1L) + .append("f1", "ABCDE") + .append("f2", true) + .append("f3", new Binary(new byte[] {(byte) 3})) + .append("f4", 6) + .append("f5", Date.from(now)) + .append("f6", Date.from(now)) + .append("f7", 10.10d) + .append("f8", new Decimal128(new BigDecimal("11.11"))) + .append("f9", new Document("k", 12)) + .append("f10", new Document("k", 13)) + .append("f11", Arrays.asList("14_1", "14_2")) + .append( + "f12", + Arrays.asList( + new Document("k", "15_1"), new Document("k", "15_2"))); + + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testRoundTripReadAndSink() throws ExecutionException, InterruptedException { + String database = "test"; + String sourceCollection = "test_round_trip_source"; + String sinkCollection = "test_round_trip_sink"; + + BsonDocument testData = + new BsonDocument("f1", new BsonString("ABCDE")) + .append("f2", new BsonBoolean(true)) + .append("f3", new BsonBinary(new byte[] {(byte) 3})) + .append("f4", new BsonInt32(32)) + .append("f5", new BsonInt64(64L)) + .append("f6", new BsonDouble(128.128d)) + .append("f7", new BsonDecimal128(new Decimal128(new BigDecimal("256.256")))) + .append("f8", new BsonDateTime(Instant.now().toEpochMilli())) + .append("f9", new BsonTimestamp((int) Instant.now().getEpochSecond(), 100)) + .append( + "f10", + new BsonRegularExpression(Pattern.compile("^9$").pattern(), "i")) + .append("f11", new BsonJavaScript("function() { return 10; }")) + .append( + "f12", + new BsonJavaScriptWithScope( + "function() { return 11; }", new BsonDocument())) + .append("f13", new BsonDbPointer("test.test", new ObjectId())) + .append("f14", new BsonSymbol("symbol")) + .append( + "f15", + new BsonArray(Arrays.asList(new BsonInt32(1), new BsonInt32(2)))) + .append("f16", new BsonDocument("k", new BsonInt32(32))); + + MongoCollection<BsonDocument> sourceColl = + mongoClient + .getDatabase(database) + .getCollection(sourceCollection) + .withDocumentClass(BsonDocument.class); + sourceColl.insertOne(testData); + + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "CREATE TABLE mongo_source (\n" + + "`_id` STRING,\n" + + "`f1` STRING,\n" + + "`f2` BOOLEAN,\n" + + "`f3` BINARY,\n" + + "`f4` INTEGER,\n" + + "`f5` BIGINT,\n" + + "`f6` DOUBLE,\n" + + "`f7` DECIMAL(10, 3),\n" + + "`f8` TIMESTAMP_LTZ(3),\n" + + "`f9` STRING,\n" + + "`f10` STRING,\n" + + "`f11` STRING,\n" + + "`f12` STRING,\n" + + "`f13` STRING,\n" + + "`f14` STRING,\n" + + "`f15` ARRAY<INTEGER>,\n" + + "`f16` ROW<k INTEGER>,\n" + + " PRIMARY KEY (_id) NOT ENFORCED\n" + + ") WITH ( %s )", + getConnectorSql(database, sourceCollection))); + + tEnv.executeSql( + String.format( + "CREATE TABLE mongo_sink WITH ( %s ) LIKE mongo_source", + getConnectorSql(database, sinkCollection))); + + tEnv.executeSql("insert into mongo_sink select * from mongo_source").await(); + + MongoCollection<BsonDocument> sinkColl = + mongoClient + .getDatabase(database) + .getCollection(sinkCollection) + .withDocumentClass(BsonDocument.class); + + BsonDocument actual = sinkColl.find().first(); + + assertThat(actual).isEqualTo(testData); + } + + @Test + public void testSinkWithAllRowKind() throws ExecutionException, InterruptedException { + String database = "test"; + String collection = "test_sink_with_all_row_kind"; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + DataStream<Row> sourceStream = + env.fromCollection( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1L, "Alice"), + Row.ofKind(RowKind.DELETE, 1L, "Alice"), + Row.ofKind(RowKind.INSERT, 2L, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2L, "Bob"), + Row.ofKind(RowKind.UPDATE_AFTER, 2L, "Tom"))) + .returns( + new RowTypeInfo( + new TypeInformation[] {Types.LONG, Types.STRING}, + new String[] {"id", "name"})); + + Schema sourceSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .build(); + + Table sourceTable = tEnv.fromChangelogStream(sourceStream, sourceSchema); + tEnv.createTemporaryView("value_source", sourceTable); + + tEnv.executeSql( + String.format( + "CREATE TABLE mongo_sink (\n" + + "`_id` BIGINT,\n" + + "`name` STRING,\n" + + " PRIMARY KEY (_id) NOT ENFORCED\n" + + ") WITH ( %s )", + getConnectorSql(database, collection))); + + tEnv.executeSql("insert into mongo_sink select * from value_source").await(); + + MongoCollection<Document> coll = + mongoClient.getDatabase(database).getCollection(collection); + + List<Document> expected = + Collections.singletonList(new Document("_id", 2L).append("name", "Tom")); + + List<Document> actual = coll.find().into(new ArrayList<>()); + + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testSinkWithReservedId() throws Exception { + String database = "test"; + String collection = "sink_with_reserved_id"; + + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "CREATE TABLE mongo_sink (" + + "_id STRING NOT NULL,\n" + + "f1 STRING NOT NULL,\n" + + "PRIMARY KEY (_id) NOT ENFORCED\n" + + ")\n" + + "WITH (%s)", + getConnectorSql(database, collection))); + + ObjectId objectId = new ObjectId(); + tEnv.fromValues(row(objectId.toHexString(), "r1"), row("str", "r2")) + .executeInsert("mongo_sink") + .await(); + + MongoCollection<Document> coll = + mongoClient.getDatabase(database).getCollection(collection); + + List<Document> actual = new ArrayList<>(); + coll.find(Filters.in("_id", objectId, "str")).into(actual); + + Document[] expected = + new Document[] { + new Document("_id", objectId).append("f1", "r1"), + new Document("_id", "str").append("f1", "r2") + }; + assertThat(actual).containsExactlyInAnyOrder(expected); + } + + @Test + public void testSinkWithoutPrimaryKey() throws Exception { + String database = "test"; + String collection = "sink_without_primary_key"; + + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "CREATE TABLE mongo_sink (" + "f1 STRING NOT NULL\n" + ")\n" + "WITH (%s)", + getConnectorSql(database, collection))); + + tEnv.fromValues(row("d1"), row("d1")).executeInsert("mongo_sink").await(); + + MongoCollection<Document> coll = + mongoClient.getDatabase(database).getCollection(collection); + + List<Document> actual = new ArrayList<>(); + coll.find().into(actual); + + assertThat(actual).hasSize(2); + for (Document doc : actual) { + assertThat(doc.get("f1")).isEqualTo("d1"); + } + } + + @Test + public void testSinkWithNonCompositePrimaryKey() throws Exception { + String database = "test"; + String collection = "sink_with_non_composite_pk"; + + Instant now = Instant.now(); + List<Expression> testValues = + Collections.singletonList( + row(1L, true, "ABCDE", 12.12d, 4, Timestamp.from(now), now)); Review Comment: ```suggestion row(2L, true, "ABCDE", 12.12d, 4, Timestamp.from(now), now)); ``` Let's use some value that doesnt happen to coincide with the generated(?) `_id`. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoScanSplitAssigner.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState; +import org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitters; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; + +import com.mongodb.MongoNamespace; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** The split assigner for {@link MongoScanSourceSplit}. */ +@Internal +public class MongoScanSplitAssigner implements MongoSplitAssigner { + + private static final Logger LOG = LoggerFactory.getLogger(MongoScanSplitAssigner.class); + + private final MongoConnectionOptions connectionOptions; + private final MongoReadOptions readOptions; + + private final LinkedList<String> remainingCollections; + private final List<String> alreadyProcessedCollections; + private final List<MongoScanSourceSplit> remainingScanSplits; + private final Map<String, MongoScanSourceSplit> assignedScanSplits; + private boolean initialized; + + private transient MongoSplitters mongoSplitters; + + public MongoScanSplitAssigner( + MongoConnectionOptions connectionOptions, + MongoReadOptions readOptions, + MongoSourceEnumState sourceEnumState) { + this.connectionOptions = connectionOptions; + this.readOptions = readOptions; + this.remainingCollections = new LinkedList<>(sourceEnumState.getRemainingCollections()); + this.alreadyProcessedCollections = sourceEnumState.getAlreadyProcessedCollections(); + this.remainingScanSplits = sourceEnumState.getRemainingScanSplits(); + this.assignedScanSplits = sourceEnumState.getAssignedScanSplits(); + this.initialized = sourceEnumState.isInitialized(); + } + + @Override + public void open() { + LOG.info("Mongo scan split assigner is opening."); + if (!initialized) { + String collectionId = + String.format( + "%s.%s", + connectionOptions.getDatabase(), connectionOptions.getCollection()); + remainingCollections.add(collectionId); + mongoSplitters = new MongoSplitters(connectionOptions, readOptions); + initialized = true; + } + } + + @Override + public Optional<MongoSourceSplit> getNext() { + if (!remainingScanSplits.isEmpty()) { + // return remaining splits firstly + Iterator<MongoScanSourceSplit> iterator = remainingScanSplits.iterator(); + MongoScanSourceSplit split = iterator.next(); + iterator.remove(); + assignedScanSplits.put(split.splitId(), split); + return Optional.of(split); + } else { + // it's turn for next collection + String nextCollection = remainingCollections.pollFirst(); + if (nextCollection != null) { + // split the given collection into chunks (scan splits) + Collection<MongoScanSourceSplit> splits = + mongoSplitters.split(new MongoNamespace(nextCollection)); + remainingScanSplits.addAll(splits); + alreadyProcessedCollections.add(nextCollection); + return getNext(); + } else { + return Optional.empty(); + } + } + } + + @Override + public void addSplitsBack(Collection<MongoSourceSplit> splits) { + for (MongoSourceSplit split : splits) { + if (split instanceof MongoScanSourceSplit) { + remainingScanSplits.add((MongoScanSourceSplit) split); + // we should remove the add-backed splits from the assigned list, + // because they are failed + assignedScanSplits.remove(split.splitId()); + } + } + } + + @Override + public MongoSourceEnumState snapshotState(long checkpointId) { + return new MongoSourceEnumState( + remainingCollections, + alreadyProcessedCollections, + remainingScanSplits, + assignedScanSplits, + initialized); + } + + @Override + public boolean noMoreSplits() { + return initialized && remainingCollections.isEmpty() && remainingScanSplits.isEmpty(); + } Review Comment: I'd rather add a Precondition that initialized is true. We don't want this to be called in any other situation, and as-is we might mask such a problem. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoSplitAssigner.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Optional; + +/** The split assigner for {@link MongoSourceSplit}. */ +@Internal +public interface MongoSplitAssigner extends Serializable { + + /** + * Called to open the assigner to acquire any resources, like threads or network connections. + */ + void open(); + + /** + * Called to close the assigner, in case it holds on to any resources, like threads or network + * connections. + */ + void close() throws IOException; + + /** Gets the next split. */ + Optional<MongoSourceSplit> getNext(); Review Comment: Document that. Specifically it's relationship to `noMoreSplits()`. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/serialization/MongoRowDataDeserializationSchema.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.serialization; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema; +import org.apache.flink.connector.mongodb.table.converter.BsonToRowDataConverters; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import org.bson.BsonDocument; + +/** Deserializer that {@link BsonDocument} to flink internal {@link RowData}. */ Review Comment: ```suggestion /** Deserializer that maps {@link BsonDocument} to {@link RowData}. */ ``` ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.common.utils.MongoUtils; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; +import org.apache.flink.util.FlinkRuntimeException; + +import com.mongodb.MongoException; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +/** To split collections of MongoDB to {@link MongoSourceSplit}s. */ +@Internal +public class MongoSplitters implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(MongoSplitters.class); + + private final MongoReadOptions readOptions; + private final MongoClient mongoClient; + + public MongoSplitters(MongoConnectionOptions connectionOptions, MongoReadOptions readOptions) { + this.readOptions = readOptions; + this.mongoClient = MongoClients.create(connectionOptions.getUri()); + } + + public Collection<MongoScanSourceSplit> split(MongoNamespace namespace) { + BsonDocument collStats; + try { + collStats = MongoUtils.collStats(mongoClient, namespace); + } catch (MongoException e) { + LOG.error("Execute collStats command failed, with error message: {}", e.getMessage()); + throw new FlinkRuntimeException(e); + } + + MongoSplitContext splitContext = + MongoSplitContext.of(readOptions, mongoClient, namespace, collStats); + + switch (readOptions.getPartitionStrategy()) { + case SINGLE: + return MongoSingleSplitter.INSTANCE.split(splitContext); Review Comment: These could all be static methods; the singleton pattern doesn't really gives as any benefit. ########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.connector.mongodb.testutils; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +/** Sharded Containers. */ +public class MongoShardedContainers implements BeforeAllCallback, AfterAllCallback { + + private static final Logger LOG = LoggerFactory.getLogger(MongoShardedContainers.class); + + private static final int MONGODB_INTERNAL_PORT = 27017; + + private static final String CONFIG_REPLICA_SET_NAME = "rs-config-0"; + private static final String SHARD_REPLICA_SET_NAME = "rs-shard-0"; + + private static final String CONFIG_HOSTNAME = "config-0"; + private static final String SHARD_HOSTNAME = "shard-0"; + private static final String ROUTER_HOSTNAME = "router-0"; + + private final MongoDBContainer configSrv; + private final MongoDBContainer shardSrv; + private final MongoDBContainer router; + + MongoShardedContainers(DockerImageName dockerImageName, Network network) { + Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG); + this.configSrv = + new MongoDBContainer(dockerImageName) + .withCreateContainerCmdModifier(it -> it.withHostName(CONFIG_HOSTNAME)) + .withCommand( + "-configsvr", + "--replSet", + CONFIG_REPLICA_SET_NAME, + "--port", + String.valueOf(MONGODB_INTERNAL_PORT)) + .withNetwork(network) + .withNetworkAliases(CONFIG_HOSTNAME) + .withLogConsumer(logConsumer); + this.shardSrv = + new MongoDBContainer(dockerImageName) + .withCreateContainerCmdModifier(it -> it.withHostName(SHARD_HOSTNAME)) + .withCommand( + "-shardsvr", + "--replSet", + SHARD_REPLICA_SET_NAME, + "--port", + String.valueOf(MONGODB_INTERNAL_PORT)) + .withNetwork(network) + .withNetworkAliases(SHARD_HOSTNAME) + .withLogConsumer(logConsumer); + this.router = + new MongoRouterContainer(dockerImageName) + .withCreateContainerCmdModifier(it -> it.withHostName(ROUTER_HOSTNAME)) + .dependsOn(configSrv, shardSrv) + .withNetwork(network) + .withNetworkAliases(ROUTER_HOSTNAME) + .withLogConsumer(logConsumer); + } + + public void start() { + LOG.info("Starting ConfigSrv container"); + configSrv.start(); + LOG.info("Starting ShardSrv container"); + shardSrv.start(); + LOG.info("Starting Router containers"); + router.start(); + } + + public void close() { + router.stop(); + shardSrv.stop(); + configSrv.stop(); + } Review Comment: Why are these public? can we just inline them into the callback methods? ########## flink-connector-mongodb/src/test/resources/log4j2-test.properties: ########## @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO Review Comment: ```suggestion rootLogger.level = OFF ``` ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.converter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.bson.BsonArray; +import org.bson.BsonBinary; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonNull; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.json.JsonParseException; +import org.bson.types.Decimal128; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ENCODE_VALUE_FIELD; + +/** Tool class used to convert from {@link RowData} to {@link BsonValue}. */ +@Internal +public class RowDataToBsonConverters { + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures to + * corresponding {@link BsonValue} data structures. + */ + @FunctionalInterface + public interface RowDataToBsonConverter extends Serializable { + BsonValue convert(Object value); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for + // sql-connector uber jars. + // -------------------------------------------------------------------------------- + + public static RowDataToBsonConverter createConverter(LogicalType type) { Review Comment: Should only accept RowData ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; + +import java.time.Duration; + +/** + * Base options for the MongoDB connector. Needs to be public so that the {@link + * org.apache.flink.table.api.TableDescriptor} can access it. + */ +@PublicEvolving +public class MongoConnectorOptions { + + private MongoConnectorOptions() {} + + public static final ConfigOption<String> URI = + ConfigOptions.key("uri") + .stringType() + .noDefaultValue() + .withDescription("Specifies the connection uri of MongoDB."); + + public static final ConfigOption<String> DATABASE = + ConfigOptions.key("database") + .stringType() + .noDefaultValue() + .withDescription("Specifies the database to read or write of MongoDB."); + + public static final ConfigOption<String> COLLECTION = + ConfigOptions.key("collection") + .stringType() + .noDefaultValue() + .withDescription("Specifies the collection to read or write of MongoDB."); + + public static final ConfigOption<Integer> SCAN_FETCH_SIZE = + ConfigOptions.key("scan.fetch-size") + .intType() + .defaultValue(2048) + .withDescription( + "Gives the reader a hint as to the number of documents that should be fetched from the database per round-trip when reading. "); + + public static final ConfigOption<Integer> SCAN_CURSOR_BATCH_SIZE = Review Comment: This isn't enough information for users, and current descriptions are highly misleading. scan size states "the number of documents that should be fetched from the database", but it's rather the number of records _consumed_ in one fetch() call from one or more retrieved batches. But this is a highly technical description that users won't know what to do with. We just have to expand a bit on the trade-offs. batch size reduces the # of calls to MongoDB at the cost of higher memory consumption in Flink and higher risk of throwing away fetched data (when fetch-size is significantly lower and a failure occurs), while fetch size is all about keeping the source responsive to checkpointing. I do wonder though whether fetch-size should be necessary. To me it really points to a problem on how `fetch()` is implemented / designed. What would be the overhead of just existing fetch() every 10 records? ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.utils.MongoValidationUtils; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.RowData.FieldGetter; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.SerializableFunction; + +import org.bson.BsonObjectId; +import org.bson.BsonValue; +import org.bson.types.ObjectId; + +import java.util.Optional; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** An extractor for a MongoDB key from a {@link RowData}. */ +@Internal +public class MongoKeyExtractor implements SerializableFunction<RowData, BsonValue> { + + public static final String RESERVED_ID = ID_FIELD; + + private static final AppendOnlyKeyExtractor APPEND_ONLY_KEY_EXTRACTOR = + new AppendOnlyKeyExtractor(); + + private final int[] primaryKeyIndexes; + + private final RowDataToBsonConverter primaryKeyConverter; + + private final FieldGetter primaryKeyGetter; + + private MongoKeyExtractor(LogicalType primaryKeyType, int[] primaryKeyIndexes) { + this.primaryKeyIndexes = primaryKeyIndexes; + this.primaryKeyConverter = RowDataToBsonConverters.createConverter(primaryKeyType); + if (isCompoundPrimaryKey(primaryKeyIndexes)) { + this.primaryKeyGetter = + rowData -> ProjectedRowData.from(primaryKeyIndexes).replaceRow(rowData); + } else { + this.primaryKeyGetter = RowData.createFieldGetter(primaryKeyType, primaryKeyIndexes[0]); + } + } + + @Override + public BsonValue apply(RowData rowData) { + Object rowKeyValue = primaryKeyGetter.getFieldOrNull(rowData); + checkNotNull(rowKeyValue, "Primary key value is null of RowData: " + rowData); + BsonValue keyValue = primaryKeyConverter.convert(rowKeyValue); + if (!isCompoundPrimaryKey(primaryKeyIndexes) && keyValue.isString()) { + String keyString = keyValue.asString().getValue(); + // Try to restore MongoDB's ObjectId from string. + if (ObjectId.isValid(keyString)) { + keyValue = new BsonObjectId(new ObjectId(keyString)); + } + } + return keyValue; + } + + public static SerializableFunction<RowData, BsonValue> createKeyExtractor( + ResolvedSchema resolvedSchema) { + + Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey(); + int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes(); + Optional<Column> reservedId = resolvedSchema.getColumn(RESERVED_ID); + + // Primary key is not declared and reserved _id is not present. + if (!primaryKey.isPresent() && !reservedId.isPresent()) { + return APPEND_ONLY_KEY_EXTRACTOR; + } + + if (reservedId.isPresent()) { + // Ambiguous keys being used due to the presence of an _id field. + if (!primaryKey.isPresent() + || isCompoundPrimaryKey(primaryKeyIndexes) + || !primaryKeyContainsReservedId(primaryKey.get())) { + throw new IllegalArgumentException( + "Ambiguous keys being used due to the presence of an _id field."); Review Comment: Add instructions for how to resolve that. For example, "Either use the _id column as the key, or rename the _id column". ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.utils.MongoValidationUtils; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.RowData.FieldGetter; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.SerializableFunction; + +import org.bson.BsonObjectId; +import org.bson.BsonValue; +import org.bson.types.ObjectId; + +import java.util.Optional; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** An extractor for a MongoDB key from a {@link RowData}. */ +@Internal +public class MongoKeyExtractor implements SerializableFunction<RowData, BsonValue> { + + public static final String RESERVED_ID = ID_FIELD; + + private static final AppendOnlyKeyExtractor APPEND_ONLY_KEY_EXTRACTOR = + new AppendOnlyKeyExtractor(); + + private final int[] primaryKeyIndexes; + + private final RowDataToBsonConverter primaryKeyConverter; + + private final FieldGetter primaryKeyGetter; + + private MongoKeyExtractor(LogicalType primaryKeyType, int[] primaryKeyIndexes) { + this.primaryKeyIndexes = primaryKeyIndexes; + this.primaryKeyConverter = RowDataToBsonConverters.createConverter(primaryKeyType); + if (isCompoundPrimaryKey(primaryKeyIndexes)) { + this.primaryKeyGetter = + rowData -> ProjectedRowData.from(primaryKeyIndexes).replaceRow(rowData); + } else { + this.primaryKeyGetter = RowData.createFieldGetter(primaryKeyType, primaryKeyIndexes[0]); + } + } + + @Override + public BsonValue apply(RowData rowData) { + Object rowKeyValue = primaryKeyGetter.getFieldOrNull(rowData); + checkNotNull(rowKeyValue, "Primary key value is null of RowData: " + rowData); + BsonValue keyValue = primaryKeyConverter.convert(rowKeyValue); + if (!isCompoundPrimaryKey(primaryKeyIndexes) && keyValue.isString()) { + String keyString = keyValue.asString().getValue(); + // Try to restore MongoDB's ObjectId from string. + if (ObjectId.isValid(keyString)) { + keyValue = new BsonObjectId(new ObjectId(keyString)); + } + } + return keyValue; + } + + public static SerializableFunction<RowData, BsonValue> createKeyExtractor( + ResolvedSchema resolvedSchema) { + + Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey(); + int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes(); + Optional<Column> reservedId = resolvedSchema.getColumn(RESERVED_ID); + + // Primary key is not declared and reserved _id is not present. + if (!primaryKey.isPresent() && !reservedId.isPresent()) { + return APPEND_ONLY_KEY_EXTRACTOR; + } + + if (reservedId.isPresent()) { + // Ambiguous keys being used due to the presence of an _id field. + if (!primaryKey.isPresent() + || isCompoundPrimaryKey(primaryKeyIndexes) + || !primaryKeyContainsReservedId(primaryKey.get())) { + throw new IllegalArgumentException( + "Ambiguous keys being used due to the presence of an _id field."); + } + } + + DataType primaryKeyType; + if (isCompoundPrimaryKey(primaryKeyIndexes)) { + DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType(); + primaryKeyType = Projection.of(primaryKeyIndexes).project(physicalRowDataType); + } else { + int primaryKeyIndex = primaryKeyIndexes[0]; + Optional<Column> column = resolvedSchema.getColumn(primaryKeyIndex); + if (!column.isPresent()) { + throw new IllegalStateException( + String.format( + "No primary key column found with index '%s'.", primaryKeyIndex)); + } + primaryKeyType = column.get().getDataType(); + } + + MongoValidationUtils.validatePrimaryKey(primaryKeyType); + + return new MongoKeyExtractor(primaryKeyType.getLogicalType(), primaryKeyIndexes); + } + + private static boolean isCompoundPrimaryKey(int[] primaryKeyIndexes) { + return primaryKeyIndexes.length > 1; + } + + private static boolean primaryKeyContainsReservedId(UniqueConstraint primaryKey) { + return primaryKey.getColumns().contains(RESERVED_ID); + } + + /** + * It behaves as append-only when no primary key is declared and reserved _id is not present. We + * use static class instead of lambda for a reason here. It is necessary because the maven shade Review Comment: ```suggestion * use static class instead of lambda because the maven shade ``` ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.utils.MongoValidationUtils; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.RowData.FieldGetter; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.SerializableFunction; + +import org.bson.BsonObjectId; +import org.bson.BsonValue; +import org.bson.types.ObjectId; + +import java.util.Optional; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** An extractor for a MongoDB key from a {@link RowData}. */ +@Internal +public class MongoKeyExtractor implements SerializableFunction<RowData, BsonValue> { Review Comment: missing SerialVersion UID ########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_BATCH_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SAMPLES; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_STRATEGY; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Test for {@link MongoDynamicTableSource} and {@link MongoDynamicTableSink} created by {@link + * MongoDynamicTableFactory}. + */ +public class MongoDynamicTableFactoryTest { + + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3))), + Collections.emptyList(), + UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa"))); + + @Test + public void testMongoCommonProperties() { + Map<String, String> properties = getRequiredOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA, properties); + + MongoConnectionOptions connectionOptions = + MongoConnectionOptions.builder() + .setUri("mongodb://127.0.0.1:27017") + .setDatabase("test_db") + .setCollection("test_coll") + .build(); + + MongoDynamicTableSource expectedSource = + new MongoDynamicTableSource( + connectionOptions, + MongoReadOptions.builder().build(), + null, + LookupOptions.MAX_RETRIES.defaultValue(), + LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(), + SCHEMA.toPhysicalRowDataType()); + assertThat(actualSource).isEqualTo(expectedSource); + + // validation for sink Review Comment: split the test case instead of adding such comments. ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** + * Sample Partitioner + * + * <p>Samples the collection to generate partitions. + * + * <p>Uses the average document size to split the collection into average sized chunks + * + * <p>The partitioner samples the collection, projects and sorts by the partition fields. Then uses + * every {@code samplesPerPartition} as the value to use to calculate the partition boundaries. + * + * <ul> + * <li>scan.partition.size: The average size (MB) for each partition. Note: Uses the average + * document size to determine the number of documents per partition so may not be even. + * Defaults to: 64mb. + * <li>scan.partition.samples: The number of samples to take per partition. Defaults to: 10. The + * total number of samples taken is calculated as: {@code samples per partition * (count of + * documents / number of documents per partition)}. Review Comment: Why aren't we just doing this? ``` // the number of elements we want each split to read numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes sampleCount = count / numDocumentsPerPartition totalSplitCount = sampleCount + 1 ``` What are multiple sample per split supposed to give us? Is the MongoDB sampling not good enough that we need a higher resolution? ########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/deserializer/MongoJsonDeserializationSchema.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.reader.deserializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.bson.BsonDocument; +import org.bson.json.JsonMode; + +import java.util.Optional; + +/** + * A schema bridge for deserializing the MongoDB's {@code BsonDocument} to MongoDB's {@link + * JsonMode}'s RELAXED Json string. + */ +@PublicEvolving +public class MongoJsonDeserializationSchema implements MongoDeserializationSchema<String> { Review Comment: > Used to convert BsonDocument to Json format That's not a use-case, but what it does. Who do we expect to use it? Is it only for testing? Production usage? ########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_BATCH_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SAMPLES; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_STRATEGY; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Test for {@link MongoDynamicTableSource} and {@link MongoDynamicTableSink} created by {@link + * MongoDynamicTableFactory}. + */ +public class MongoDynamicTableFactoryTest { + + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3))), + Collections.emptyList(), + UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa"))); + + @Test + public void testMongoCommonProperties() { + Map<String, String> properties = getRequiredOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA, properties); + + MongoConnectionOptions connectionOptions = + MongoConnectionOptions.builder() + .setUri("mongodb://127.0.0.1:27017") + .setDatabase("test_db") + .setCollection("test_coll") + .build(); + + MongoDynamicTableSource expectedSource = + new MongoDynamicTableSource( + connectionOptions, + MongoReadOptions.builder().build(), + null, + LookupOptions.MAX_RETRIES.defaultValue(), + LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(), + SCHEMA.toPhysicalRowDataType()); + assertThat(actualSource).isEqualTo(expectedSource); + + // validation for sink + DynamicTableSink actualSink = createTableSink(SCHEMA, properties); + + MongoWriteOptions writeOptions = MongoWriteOptions.builder().build(); + MongoDynamicTableSink expectedSink = + new MongoDynamicTableSink( + connectionOptions, + writeOptions, + null, + SCHEMA.getPrimaryKey().isPresent(), + SCHEMA.toPhysicalRowDataType(), + MongoKeyExtractor.createKeyExtractor(SCHEMA)); + assertThat(actualSink).isEqualTo(expectedSink); + } + + @Test + public void testMongoReadProperties() { + Map<String, String> properties = getRequiredOptions(); + properties.put(SCAN_FETCH_SIZE.key(), "1024"); + properties.put(SCAN_CURSOR_BATCH_SIZE.key(), "2048"); + properties.put(SCAN_CURSOR_NO_TIMEOUT.key(), "false"); + properties.put(SCAN_PARTITION_STRATEGY.key(), "split-vector"); + properties.put(SCAN_PARTITION_SIZE.key(), "128m"); + properties.put(SCAN_PARTITION_SAMPLES.key(), "5"); + + DynamicTableSource actual = createTableSource(SCHEMA, properties); + + MongoConnectionOptions connectionOptions = + MongoConnectionOptions.builder() + .setUri("mongodb://127.0.0.1:27017") + .setDatabase("test_db") + .setCollection("test_coll") + .build(); + MongoReadOptions readOptions = + MongoReadOptions.builder() + .setFetchSize(1024) + .setCursorBatchSize(2048) + .setNoCursorTimeout(false) + .setPartitionStrategy(PartitionStrategy.SPLIT_VECTOR) + .setPartitionSize(MemorySize.ofMebiBytes(128)) + .setSamplesPerPartition(5) + .build(); + + MongoDynamicTableSource expected = + new MongoDynamicTableSource( + connectionOptions, + readOptions, + null, + LookupOptions.MAX_RETRIES.defaultValue(), + LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(), + SCHEMA.toPhysicalRowDataType()); + + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testMongoLookupProperties() { + Map<String, String> properties = getRequiredOptions(); + properties.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL"); + properties.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "10s"); + properties.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "20s"); + properties.put(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY.key(), "false"); + properties.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), "15213"); + properties.put(LookupOptions.MAX_RETRIES.key(), "10"); + properties.put(LOOKUP_RETRY_INTERVAL.key(), "20ms"); + + DynamicTableSource actual = createTableSource(SCHEMA, properties); + + MongoConnectionOptions connectionOptions = + MongoConnectionOptions.builder() + .setUri("mongodb://127.0.0.1:27017") + .setDatabase("test_db") + .setCollection("test_coll") + .build(); + + MongoDynamicTableSource expected = + new MongoDynamicTableSource( + connectionOptions, + MongoReadOptions.builder().build(), + DefaultLookupCache.fromConfig(Configuration.fromMap(properties)), + 10, + 20, + SCHEMA.toPhysicalRowDataType()); + + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testMongoSinkProperties() { + Map<String, String> properties = getRequiredOptions(); + properties.put(BUFFER_FLUSH_MAX_ROWS.key(), "1001"); + properties.put(BUFFER_FLUSH_INTERVAL.key(), "2min"); + properties.put(DELIVERY_GUARANTEE.key(), "at-least-once"); + properties.put(SINK_MAX_RETRIES.key(), "5"); + properties.put(SINK_RETRY_INTERVAL.key(), "2s"); + + DynamicTableSink actual = createTableSink(SCHEMA, properties); + + MongoConnectionOptions connectionOptions = + MongoConnectionOptions.builder() + .setUri("mongodb://127.0.0.1:27017") + .setDatabase("test_db") + .setCollection("test_coll") + .build(); + MongoWriteOptions writeOptions = + MongoWriteOptions.builder() + .setBatchSize(1001) + .setBatchIntervalMs(TimeUnit.MINUTES.toMillis(2)) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setMaxRetries(5) + .setRetryIntervalMs(TimeUnit.SECONDS.toMillis(2)) + .build(); + + MongoDynamicTableSink expected = + new MongoDynamicTableSink( + connectionOptions, + writeOptions, + null, + SCHEMA.getPrimaryKey().isPresent(), + SCHEMA.toPhysicalRowDataType(), + MongoKeyExtractor.createKeyExtractor(SCHEMA)); + + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testMongoSinkWithParallelism() { + Map<String, String> properties = getRequiredOptions(); + properties.put("sink.parallelism", "2"); + + DynamicTableSink actual = createTableSink(SCHEMA, properties); + + MongoConnectionOptions connectionOptions = + MongoConnectionOptions.builder() + .setUri("mongodb://127.0.0.1:27017") + .setDatabase("test_db") + .setCollection("test_coll") + .build(); + + MongoWriteOptions writeOptions = MongoWriteOptions.builder().build(); + + MongoDynamicTableSink expected = + new MongoDynamicTableSink( + connectionOptions, + writeOptions, + 2, + SCHEMA.getPrimaryKey().isPresent(), + SCHEMA.toPhysicalRowDataType(), + MongoKeyExtractor.createKeyExtractor(SCHEMA)); + + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testMongoValidation() { + // fetch size lower than 1 + Map<String, String> properties = getRequiredOptions(); + properties.put(SCAN_FETCH_SIZE.key(), "0"); + + Map<String, String> finalProperties1 = properties; Review Comment: You could make this test a lot nicer with a utility method like `getRequiredOptionsWithSetting(String, String)`. ``` assertThatThrownBy(() -> createTableSource(SCHEMA, getRequiredOptionsWithSetting(SCAN_FETCH_SIZE.key(), "0")) .hasStackTraceContaining("The fetch size must be larger than 0."); ``` Bonus points for adding another `assertValidationRejects(String, String,String)` method. ``` assertValidationRejects(SCAN_FETCH_SIZE.key(), "0", "The fetch size must be larger than 0."); ``` ########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.sink; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.mongodb.MongoTestUtil; +import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext; +import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.WriteModel; +import org.bson.BsonDocument; +import org.bson.Document; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import javax.annotation.Nullable; + +import static org.apache.flink.connector.mongodb.MongoTestUtil.assertThatIdsAreWritten; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for {@link MongoSink}. */ +@Testcontainers +public class MongoSinkITCase { + + private static final Logger LOG = LoggerFactory.getLogger(MongoSinkITCase.class); + + @Container + private static final MongoDBContainer MONGO_CONTAINER = + MongoTestUtil.createMongoDBContainer(LOG); + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setConfiguration(new Configuration()) + .build()); + + private static final String TEST_DATABASE = "test_sink"; + + private static boolean failed; + + private static MongoClient mongoClient; + + @BeforeAll + static void setUp() { + failed = false; + mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString()); + } + + @AfterAll + static void tearDown() { + if (mongoClient != null) { + mongoClient.close(); + } + } + + @ParameterizedTest + @EnumSource(DeliveryGuarantee.class) + void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) + throws Exception { + final String index = "test-sink-with-delivery-" + deliveryGuarantee; + boolean failure = false; + try { + runTest(index, false, deliveryGuarantee, null); + } catch (IllegalArgumentException e) { + failure = true; + assertThat(deliveryGuarantee).isSameAs(DeliveryGuarantee.EXACTLY_ONCE); Review Comment: I still think we shouldn't even test exactly_once, apart from it being rejected (in a dedicated test). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
