JingsongLi commented on a change in pull request #54:
URL: https://github.com/apache/flink-table-store/pull/54#discussion_r830799765



##########
File path: 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier 
identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(

Review comment:
       Can we add this validation to `TableStore`?
   We can introduce a `adjustAndValidateKeys()`.
   withPartition and withPrimaryKeys will call this method.

##########
File path: 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier 
identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(
+                    new HashSet<>(pkCols).containsAll(partitionKeys),
+                    String.format(
+                            "Primary key constraint %s should include 
partition key %s",

Review comment:
       Primary key constraint %s should include all partition fields %s

##########
File path: 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier 
identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(
+                    new HashSet<>(pkCols).containsAll(partitionKeys),
+                    String.format(
+                            "Primary key constraint %s should include 
partition key %s",
+                            pkCols, partitionKeys));
+            Set<String> partFilter = new HashSet<>(partitionKeys);
+            pkIndex =
                     schema.getPrimaryKey().get().getColumns().stream()
+                            .filter(pk -> !partFilter.contains(pk))
                             .mapToInt(rowType.getFieldNames()::indexOf)
                             .toArray();
+            if (pkIndex.length == 0) {
+                throw new TableException(
+                        String.format(
+                                "Primary key constraint %s should not be same 
with partition key %s",

Review comment:
       Primary key constraint %s should not be same with partition fields %s, 
this will result in only one record in a partition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to