Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1018264629
########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoValidationUtils.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.common.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utility methods for validating MongoDB properties. */ +@Internal +public class MongoValidationUtils { + private static final Set<LogicalTypeRoot> ALLOWED_PRIMARY_KEY_TYPES = new LinkedHashSet<>(); + + static { + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.CHAR); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARCHAR); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BOOLEAN); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DECIMAL); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TINYINT); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.SMALLINT); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTEGER); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BIGINT); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.FLOAT); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DOUBLE); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTERVAL_YEAR_MONTH); + ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTERVAL_DAY_TIME); + } + + /** + * Checks that the table does not have a primary key defined on illegal types. In MongoDB the + * primary key is used to calculate the MongoDB document id, which is a string of up to 1024 + * bytes. It cannot have whitespaces. As of now it is calculated by concatenating the fields. Review Comment: Hi @zentol, Thanks for pointing out the inaccuracies here. The javadocs have been modified to describe the details of extracting primary keys in detail. ------------ That method checks that the table does not have a primary key defined on illegal types. The illegal types are mostly LogicalTypeFamily.COLLECTION types and LogicalTypeRoot.RAW type and other types that cannot be converted to BsonType by `RowDataToBsonConverters`. In MongoDB the primary key is used to calculate the MongoDB document _id, which may be of any BsonType other than a BsonType.ARRAY. Its value must be unique and immutable in the collection. MongoDB creates a unique index on the _id field during the creation of a collection. There are also some constraints on the primary key index. For more detailed introduction, you can refer to [Index Key Limit](https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Index-Key-Limit). - Before MongoDB 4.2, the total size of an index entry, which can include structural overhead depending on the BSON type, must be less than 1024 bytes. - Starting in version 4.2, MongoDB removes the Index Key Limit. As of now it is extracted by `MongoKeyExtractor` according to the primary key specified by the Flink table schema. - When there's only a single field in the specified primary key, we convert the field data to bson value as _id of the corresponding document. - When there's multiple fields in the specified primary key, we convert and composite these fields into a BSON OBJECT as the _id of the corresponding document. For example, If have a primary key with single field ```sql CREATE TABLE T1 ( f1 BITINT, f2 SRING, f3 FLOAT, PRIMARY KEY (f1) NOT ENFORCED ) WITH ( 'connector' = 'mongodb' ); ``` The extracted _id will be the form like: ```javascript { _id: 1 f2: "v2", f3: 3.0 } ``` __________ If have a primary key with multiple fields ```sql CREATE TABLE T2 ( f1 BITINT, f2 SRING, f3 FLOAT, PRIMARY KEY (f1, f2) NOT ENFORCED ) WITH ( 'connector' = 'mongodb' ); ``` The extracted _id will be the form like: ```javascript { // object type is acceptable for _id in MongoDB _id: { f1: 1, f2: "v2" }, f2: "v2", f3: 3.0 } ``` ---------- For why use the object type to combine fields into _id instead of concatenating these fields into a string like ES, the reason is that we don't need to handle all types to string conversions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org