LadyForest commented on a change in pull request #54:
URL: https://github.com/apache/flink-table-store/pull/54#discussion_r830827102



##########
File path: 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier 
identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(

Review comment:
       > Can we add this validation to `TableStore`? We can introduce a 
`adjustAndValidateKeys()`. withPartition and withPrimaryKeys will call this 
method.
   
   You're right, this check should be performed within `TableStore` because 
`TableStore` might be exposed to users directly in the future. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to