Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1018264629


##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoValidationUtils.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utility methods for validating MongoDB properties. */
+@Internal
+public class MongoValidationUtils {
+    private static final Set<LogicalTypeRoot> ALLOWED_PRIMARY_KEY_TYPES = new 
LinkedHashSet<>();
+
+    static {
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.CHAR);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARCHAR);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BOOLEAN);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DECIMAL);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TINYINT);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.SMALLINT);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTEGER);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BIGINT);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.FLOAT);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DOUBLE);
+        
ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+        
ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTERVAL_YEAR_MONTH);
+        ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTERVAL_DAY_TIME);
+    }
+
+    /**
+     * Checks that the table does not have a primary key defined on illegal 
types. In MongoDB the
+     * primary key is used to calculate the MongoDB document id, which is a 
string of up to 1024
+     * bytes. It cannot have whitespaces. As of now it is calculated by 
concatenating the fields.

Review Comment:
   Hi @zentol, 
   
   Thanks for pointing out the inaccuracies here. The javadocs have been 
modified to describe the details of extracting primary keys in detail.
   
   ------------
   
   That method checks that the table does not have a primary key defined on 
illegal types. The illegal types are mostly LogicalTypeFamily.COLLECTION types 
and LogicalTypeRoot.RAW type and other types that cannot be converted to 
BsonType by `RowDataToBsonConverters`.
   
   In MongoDB the primary key is used to calculate the MongoDB document _id, 
which may be of any BsonType other than a BsonType.ARRAY. Its value must be 
unique and immutable in the collection.
   
   MongoDB creates a unique index on the _id field during the creation of a 
collection. There are also some constraints on the primary key index. For more 
detailed introduction, you can refer to [Index Key 
Limit](https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Index-Key-Limit).
   
   - Before MongoDB 4.2, the total size of an index entry, which can include 
structural overhead depending on the BSON type, must be less than 1024 bytes.
   - Starting in version 4.2, MongoDB removes the Index Key Limit.
   
   As of now it is extracted by `MongoKeyExtractor` according to the primary 
key specified by the Flink table schema.
   
   - When there's only a single field in the specified primary key, we convert 
the field data to bson value as _id of the corresponding document.
   - When there's multiple fields in the specified primary key, we convert and 
composite these fields into a BSON OBJECT as the _id of the corresponding 
document.
   
   For example, 
   If have a primary key with single field
   ```sql
   CREATE TABLE T1 (
     f1 BITINT,
     f2 SRING,
     f3 FLOAT,
     PRIMARY KEY (f1) NOT ENFORCED
   ) WITH (
     'connector' = 'mongodb'
   );
   ```
   The extracted _id will be the form like:
   ```javascript
   {
     _id: 1
     f2: "v2",
     f3: 3.0
   }
   ```
   __________
   
   If have a primary key with multiple fields
   ```sql
   CREATE TABLE T2 (
     f1 BITINT,
     f2 SRING,
     f3 FLOAT,
     PRIMARY KEY (f1, f2) NOT ENFORCED
   ) WITH (
     'connector' = 'mongodb'
   );
   ```
   The extracted _id will be the form like:
   ```javascript
   {
     // object type is acceptable for _id in MongoDB
     _id: {
       f1: 1,
       f2: "v2"
     },
     f2: "v2",
     f3: 3.0
   }
   ```
   
   ----------
   
   For why use the object type to combine fields into _id instead of 
concatenating these fields into a string like ES, the reason is that we don't 
need to handle all types to string conversions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to