JingsongLi commented on code in PR #383:
URL: https://github.com/apache/flink-table-store/pull/383#discussion_r1024684798


##########
docs/content/docs/development/create-table.md:
##########
@@ -296,6 +296,24 @@ Use approach one if you have a large number of filtered 
queries
 with only `user_id`, and use approach two if you have a large
 number of filtered queries with only `catalog_id`.
 
+The primary key data types currently supported by the table store

Review Comment:
   Maybe we don't need to document this.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -157,6 +159,33 @@ public TableSchema commitNewVersion(UpdateSchema 
updateSchema) throws Exception
         }
     }
 
+    private void validatePrimaryKeysType(UpdateSchema updateSchema, 
List<String> primaryKeys) {
+        if (!primaryKeys.isEmpty()) {
+            Map<String, RowType.RowField> rowFields = new HashMap<>();
+            for (RowType.RowField rowField : 
updateSchema.rowType().getFields()) {
+                rowFields.put(StringUtils.lowerCase(rowField.getName()), 
rowField);

Review Comment:
   Why use `lowerCase`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -157,6 +159,33 @@ public TableSchema commitNewVersion(UpdateSchema 
updateSchema) throws Exception
         }
     }
 
+    private void validatePrimaryKeysType(UpdateSchema updateSchema, 
List<String> primaryKeys) {
+        if (!primaryKeys.isEmpty()) {
+            Map<String, RowType.RowField> rowFields = new HashMap<>();
+            for (RowType.RowField rowField : 
updateSchema.rowType().getFields()) {
+                rowFields.put(StringUtils.lowerCase(rowField.getName()), 
rowField);
+            }
+            for (String primaryKeyName : primaryKeys) {
+                RowType.RowField rowField = 
rowFields.get(StringUtils.lowerCase(primaryKeyName));
+                LogicalType logicalType = rowField.getType();
+                if (TableSchema.PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES.stream()
+                        .anyMatch(c -> c.isInstance(logicalType))) {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Don't support to create primary key in 
[%s], the type of column[%s] is [%s]",

Review Comment:
   `The type %s in primary key field %s is unsupported`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to