This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e3a708d  [FLINK-26753] PK constraint should include partition keys if 
table is partitioned
e3a708d is described below

commit e3a708d8ce04b9831805a818baa3d09140024f42
Author: Jane Chan <[email protected]>
AuthorDate: Wed Mar 23 11:10:14 2022 +0800

    [FLINK-26753] PK constraint should include partition keys if table is 
partitioned
    
    This closes #54
---
 .../flink/table/store/connector/TableStore.java    |  41 ++++++++
 .../table/store/connector/TableStoreFactory.java   |  38 ++-----
 .../table/store/connector/CreateTableITCase.java   |   9 +-
 .../table/store/connector/DropTableITCase.java     |   5 +-
 .../table/store/connector/FileStoreITCase.java     |   6 +-
 .../store/connector/ReadWriteTableITCase.java      |   3 +-
 .../store/connector/TableStoreFactoryTest.java     | 111 +++++++++++++++------
 .../table/store/connector/TableStoreTestBase.java  |  11 +-
 .../store/connector/sink/LogStoreSinkITCase.java   |  15 ++-
 .../flink/table/store/file/FileStoreOptions.java   |  29 ++++--
 10 files changed, 184 insertions(+), 84 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a0a5859..a21ebfe 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
@@ -52,9 +53,11 @@ import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -100,11 +103,13 @@ public class TableStore {
 
     public TableStore withPartitions(int[] partitions) {
         this.partitions = partitions;
+        adjustIndexAndValidate();
         return this;
     }
 
     public TableStore withPrimaryKeys(int[] primaryKeys) {
         this.primaryKeys = primaryKeys;
+        adjustIndexAndValidate();
         return this;
     }
 
@@ -130,6 +135,12 @@ public class TableStore {
         return partitionType.getFieldNames();
     }
 
+    @VisibleForTesting
+    List<String> primaryKeys() {
+        RowType primaryKeyType = TypeUtils.project(type, primaryKeys);
+        return primaryKeyType.getFieldNames();
+    }
+
     public Configuration logOptions() {
         return new DelegatingConfiguration(options, LOG_PREFIX);
     }
@@ -171,6 +182,36 @@ public class TableStore {
                 tableIdentifier, options, user, partitionType, keyType, 
valueType, mergeFunction);
     }
 
+    private void adjustIndexAndValidate() {
+        if (primaryKeys.length > 0 && partitions.length > 0) {
+            List<Integer> pkList = 
Arrays.stream(primaryKeys).boxed().collect(Collectors.toList());
+            List<Integer> partitionList =
+                    
Arrays.stream(partitions).boxed().collect(Collectors.toList());
+
+            String pkInfo =
+                    type == null
+                            ? pkList.toString()
+                            : TypeUtils.project(type, 
primaryKeys).getFieldNames().toString();
+            String partitionInfo =
+                    type == null
+                            ? partitionList.toString()
+                            : TypeUtils.project(type, 
partitions).getFieldNames().toString();
+            Preconditions.checkState(
+                    pkList.containsAll(partitionList),
+                    String.format(
+                            "Primary key constraint %s should include all 
partition fields %s",
+                            pkInfo, partitionInfo));
+            primaryKeys =
+                    Arrays.stream(primaryKeys).filter(pk -> 
!partitionList.contains(pk)).toArray();
+
+            Preconditions.checkState(
+                    primaryKeys.length > 0,
+                    String.format(
+                            "Primary key constraint %s should not be same with 
partition fields %s, this will result in only one record in a partition",
+                            pkInfo, partitionInfo));
+        }
+    }
+
     /** Source builder to build a flink {@link Source}. */
     public class SourceBuilder {
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index cbe6f8a..4cd73a8 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -45,12 +44,12 @@ import 
org.apache.flink.table.store.log.LogOptions.LogConsistency;
 import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -58,7 +57,6 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
 import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
 import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
@@ -88,7 +86,7 @@ public class TableStoreFactory
     @Override
     public void onCreateTable(Context context, boolean ignoreIfExists) {
         Map<String, String> options = context.getCatalogTable().getOptions();
-        Path path = tablePath(options, context.getObjectIdentifier());
+        Path path = FileStoreOptions.path(options, 
context.getObjectIdentifier());
         try {
             if (path.getFileSystem().exists(path) && !ignoreIfExists) {
                 throw new TableException(
@@ -125,7 +123,7 @@ public class TableStoreFactory
     @Override
     public void onDropTable(Context context, boolean ignoreIfNotExists) {
         Map<String, String> options = context.getCatalogTable().getOptions();
-        Path path = tablePath(options, context.getObjectIdentifier());
+        Path path = FileStoreOptions.path(options, 
context.getObjectIdentifier());
         try {
             if (path.getFileSystem().exists(path)) {
                 path.getFileSystem().delete(path, true);
@@ -241,32 +239,14 @@ public class TableStoreFactory
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier 
identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            pkIndex =
                     schema.getPrimaryKey().get().getColumns().stream()
                             .mapToInt(rowType.getFieldNames()::indexOf)
                             .toArray();
@@ -274,9 +254,9 @@ public class TableStoreFactory
         return new TableStore(Configuration.fromMap(catalogTable.getOptions()))
                 .withTableIdentifier(context.getObjectIdentifier())
                 .withSchema(rowType)
-                .withPrimaryKeys(primaryKeys)
+                .withPrimaryKeys(pkIndex)
                 .withPartitions(
-                        catalogTable.getPartitionKeys().stream()
+                        partitionKeys.stream()
                                 .mapToInt(rowType.getFieldNames()::indexOf)
                                 .toArray());
     }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
index 29698a7..26f3c1c 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -78,7 +79,9 @@ public class CreateTableITCase extends TableStoreTestBase {
             assertThat(((TableEnvironmentImpl) 
tEnv).getCatalogManager().getTable(tableIdentifier))
                     .isPresent();
             // check table store
-            assertThat(Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+            assertThat(
+                            Paths.get(rootPath, 
FileStoreOptions.relativeTablePath(tableIdentifier))
+                                    .toFile())
                     .exists();
             // check log store
             
assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore);
@@ -128,7 +131,9 @@ public class CreateTableITCase extends TableStoreTestBase {
             }
         } else if (expectedResult.expectedMessage.startsWith("Failed to create 
file store path.")) {
             // failed when creating file store
-            Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
+            Paths.get(rootPath, 
FileStoreOptions.relativeTablePath(tableIdentifier))
+                    .toFile()
+                    .mkdirs();
         } else if (expectedResult.expectedMessage.startsWith("Failed to create 
kafka topic.")) {
             // failed when creating log store
             createTopicIfNotExists(tableIdentifier.asSummaryString(), 
BUCKET.defaultValue());
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
index 2e0f1fc..eb65439 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -79,7 +80,9 @@ public class DropTableITCase extends TableStoreTestBase {
             assertThat(((TableEnvironmentImpl) 
tEnv).getCatalogManager().getTable(tableIdentifier))
                     .isNotPresent();
             // check table store
-            assertThat(Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+            assertThat(
+                            Paths.get(rootPath, 
FileStoreOptions.relativeTablePath(tableIdentifier))
+                                    .toFile())
                     .doesNotExist();
             // check log store
             
assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index a0d5185..95e1f9b 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -117,7 +117,7 @@ public class FileStoreITCase extends AbstractTestBase {
 
     @Test
     public void testPartitioned() throws Exception {
-        store.withPartitions(new int[] {1});
+        store.withPrimaryKeys(new int[] {1, 2}).withPartitions(new int[] {1});
 
         // write
         store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
@@ -153,7 +153,7 @@ public class FileStoreITCase extends AbstractTestBase {
     @Test
     public void testOverwrite() throws Exception {
         Assume.assumeTrue(isBatch);
-        store.withPartitions(new int[] {1});
+        store.withPrimaryKeys(new int[] {1, 2}).withPartitions(new int[] {1});
 
         // write
         store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
@@ -193,7 +193,7 @@ public class FileStoreITCase extends AbstractTestBase {
 
     @Test
     public void testPartitionedNonKey() throws Exception {
-        store.withPartitions(new int[] {1}).withPrimaryKeys(new int[0]);
+        store.withPrimaryKeys(new int[0]).withPartitions(new int[] {1});
 
         // write
         store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 3bdf8cc..819e018 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
@@ -86,7 +87,7 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
                 }
             }
             
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedResult.expectedRecords);
-            String relativeFilePath = 
getRelativeFileStoreTablePath(tableIdentifier);
+            String relativeFilePath = 
FileStoreOptions.relativeTablePath(tableIdentifier);
             // check snapshot file path
             assertThat(Paths.get(rootPath, relativeFilePath, 
"snapshot")).exists();
             // check manifest file path
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
index 2e894bd..bfde9aa 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -27,8 +27,9 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -39,15 +40,19 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
 import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
@@ -60,7 +65,7 @@ public class TableStoreFactoryTest {
     private static final ObjectIdentifier TABLE_IDENTIFIER =
             ObjectIdentifier.of("catalog", "database", "table");
 
-    private final ManagedTableFactory tableStoreFactory = new 
TableStoreFactory();
+    private final TableStoreFactory tableStoreFactory = new 
TableStoreFactory();
 
     @TempDir private static java.nio.file.Path sharedTempDir;
     private DynamicTableFactory.Context context;
@@ -84,11 +89,7 @@ public class TableStoreFactoryTest {
         Path expectedPath =
                 Paths.get(
                         sharedTempDir.toAbsolutePath().toString(),
-                        String.format(
-                                "root/%s.catalog/%s.db/%s",
-                                TABLE_IDENTIFIER.getCatalogName(),
-                                TABLE_IDENTIFIER.getDatabaseName(),
-                                TABLE_IDENTIFIER.getObjectName()));
+                        relativeTablePath(TABLE_IDENTIFIER));
         boolean exist = expectedPath.toFile().exists();
         if (ignoreIfExists || !exist) {
             tableStoreFactory.onCreateTable(context, ignoreIfExists);
@@ -119,11 +120,7 @@ public class TableStoreFactoryTest {
         Path expectedPath =
                 Paths.get(
                         sharedTempDir.toAbsolutePath().toString(),
-                        String.format(
-                                "root/%s.catalog/%s.db/%s",
-                                TABLE_IDENTIFIER.getCatalogName(),
-                                TABLE_IDENTIFIER.getDatabaseName(),
-                                TABLE_IDENTIFIER.getObjectName()));
+                        relativeTablePath(TABLE_IDENTIFIER));
         boolean exist = expectedPath.toFile().exists();
         if (exist || ignoreIfNotExists) {
             tableStoreFactory.onDropTable(context, ignoreIfNotExists);
@@ -161,23 +158,41 @@ public class TableStoreFactoryTest {
                 .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
     }
 
-    @Test
-    public void testTablePath() {
-        Map<String, String> options = of(FILE_PATH.key(), "dummy:/foo/bar");
-        assertThat(TableStoreFactory.tablePath(options, TABLE_IDENTIFIER))
-                .isEqualTo(
-                        new org.apache.flink.core.fs.Path(
-                                
"dummy:/foo/bar/root/catalog.catalog/database.db/table"));
-
-        assertThatThrownBy(
-                        () -> 
TableStoreFactory.tablePath(Collections.emptyMap(), TABLE_IDENTIFIER))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.file.path' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH 
('file.path' = '...')`");
+    @ParameterizedTest
+    @MethodSource("providingResolvedTable")
+    public void testBuildTableStore(
+            RowType rowType,
+            List<String> partitions,
+            int[] pkIndex,
+            TableStoreTestBase.ExpectedResult expectedResult) {
+        ResolvedCatalogTable catalogTable =
+                createResolvedTable(Collections.emptyMap(), rowType, 
partitions, pkIndex);
+        context =
+                new FactoryUtil.DefaultDynamicTableContext(
+                        TABLE_IDENTIFIER,
+                        catalogTable,
+                        Collections.emptyMap(),
+                        Configuration.fromMap(Collections.emptyMap()),
+                        Thread.currentThread().getContextClassLoader(),
+                        false);
+        if (expectedResult.success) {
+            TableStore tableStore = tableStoreFactory.buildTableStore(context);
+            
assertThat(tableStore.partitioned()).isEqualTo(catalogTable.isPartitioned());
+            assertThat(tableStore.valueCountMode())
+                    
.isEqualTo(catalogTable.getResolvedSchema().getPrimaryKeyIndexes().length == 0);
+
+            // check primary key doesn't contain partition
+            if (tableStore.partitioned() && !tableStore.valueCountMode()) {
+                assertThat(
+                                tableStore.primaryKeys().stream()
+                                        .noneMatch(pk -> 
tableStore.partitionKeys().contains(pk)))
+                        .isTrue();
+            }
+        } else {
+            assertThatThrownBy(() -> 
tableStoreFactory.buildTableStore(context))
+                    .isInstanceOf(expectedResult.expectedType)
+                    .hasMessageContaining(expectedResult.expectedMessage);
+        }
     }
 
     // ~ Tools 
------------------------------------------------------------------
@@ -256,6 +271,44 @@ public class TableStoreFactoryTest {
                 Arguments.of(enrichedOptions, false));
     }
 
+    private static Stream<Arguments> providingResolvedTable() {
+        RowType rowType = TestKeyValueGenerator.ROW_TYPE;
+        // success case
+        Arguments arg0 =
+                Arguments.of(
+                        rowType,
+                        Arrays.asList("dt", "hr"),
+                        new int[] {0, 1, 2}, // pk is [dt, hr, shopId]
+                        new TableStoreTestBase.ExpectedResult().success(true));
+
+        // failed case: pk doesn't contain partition key
+        Arguments arg1 =
+                Arguments.of(
+                        rowType,
+                        Arrays.asList("dt", "hr"),
+                        new int[] {2}, // pk is [shopId]
+                        new TableStoreTestBase.ExpectedResult()
+                                .success(false)
+                                .expectedType(IllegalStateException.class)
+                                .expectedMessage(
+                                        "Primary key constraint [shopId] 
should include all partition fields [dt, hr]"));
+
+        // failed case: pk is same as partition key
+        Arguments arg2 =
+                Arguments.of(
+                        rowType,
+                        Arrays.asList("dt", "hr", "shopId"),
+                        new int[] {0, 1, 2}, // pk is [dt, hr, shopId]
+                        new TableStoreTestBase.ExpectedResult()
+                                .success(false)
+                                .expectedType(IllegalStateException.class)
+                                .expectedMessage(
+                                        "Primary key constraint [dt, hr, 
shopId] should not be same with partition fields [dt, hr, shopId],"
+                                                + " this will result in only 
one record in a partition"));
+
+        return Stream.of(arg0, arg1, arg2);
+    }
+
     private static Map<String, String> addPrefix(
             Map<String, String> options, String prefix, Predicate<String> 
predicate) {
         Map<String, String> newOptions = new HashMap<>();
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index e3f5f1c..0fe91af 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -146,15 +147,7 @@ public abstract class TableStoreTestBase extends 
KafkaTableTestBase {
 
     protected void deleteTablePath() {
         FileUtils.deleteQuietly(
-                Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile());
-    }
-
-    protected static String getRelativeFileStoreTablePath(ObjectIdentifier 
tableIdentifier) {
-        return String.format(
-                "root/%s.catalog/%s.db/%s",
-                tableIdentifier.getCatalogName(),
-                tableIdentifier.getDatabaseName(),
-                tableIdentifier.getObjectName());
+                Paths.get(rootPath, 
FileStoreOptions.relativeTablePath(tableIdentifier)).toFile());
     }
 
     /** Expected result wrapper. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 38a4b5a..d12848f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -85,17 +85,24 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
     }
 
     private void innerTest(
-            String name, boolean isBatch, boolean partitioned, boolean 
transaction, boolean keyed)
+            String name, boolean isBatch, boolean partitioned, boolean 
transaction, boolean hasPk)
             throws Exception {
         StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : 
buildStreamEnv();
 
         // in eventual mode, failure will result in duplicate data
         TableStore store = buildTableStore(isBatch || !transaction, 
TEMPORARY_FOLDER);
         if (partitioned) {
+            if (hasPk) {
+                store.withPrimaryKeys(new int[] {1, 2});
+            } else {
+                store.withPrimaryKeys(new int[0]);
+            }
             store.withPartitions(new int[] {1});
+        } else {
+            store.withPartitions(new int[0]);
         }
 
-        if (!keyed) {
+        if (!hasPk) {
             store.withPrimaryKeys(new int[0]);
         }
 
@@ -109,7 +116,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
                                 ? LogOptions.LogConsistency.TRANSACTIONAL
                                 : LogOptions.LogConsistency.EVENTUAL,
                         TABLE_TYPE,
-                        keyed ? new int[] {2} : new int[0]);
+                        hasPk ? new int[] {2} : new int[0]);
 
         KafkaLogStoreFactory factory = discoverKafkaLogFactory();
         KafkaLogSinkProvider sinkProvider = 
factory.createSinkProvider(context, SINK_CONTEXT);
@@ -130,7 +137,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
             List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
 
             Row[] expected;
-            if (keyed) {
+            if (hasPk) {
                 expected =
                         partitioned
                                 ? new Row[] {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index aec50ed..c4b9c38 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -26,10 +26,12 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
@@ -129,13 +131,28 @@ public class FileStoreOptions implements Serializable {
     }
 
     public Path path(ObjectIdentifier tableIdentifier) {
-        return new Path(
-                options.get(FILE_PATH),
+        return path(options.toMap(), tableIdentifier);
+    }
+
+    public static Path path(Map<String, String> options, ObjectIdentifier 
tableIdentifier) {
+        Preconditions.checkArgument(
+                options.containsKey(FILE_PATH.key()),
                 String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        tableIdentifier.getCatalogName(),
-                        tableIdentifier.getDatabaseName(),
-                        tableIdentifier.getObjectName()));
+                        "Failed to create file store path. "
+                                + "Please specify a root dir by setting 
session level configuration "
+                                + "as `SET 'table-store.%s' = '...'`. "
+                                + "Alternatively, you can use a per-table root 
dir "
+                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
+                        FILE_PATH.key(), FILE_PATH.key()));
+        return new Path(options.get(FILE_PATH.key()), 
relativeTablePath(tableIdentifier));
+    }
+
+    public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+        return String.format(
+                "%s.catalog/%s.db/%s",
+                tableIdentifier.getCatalogName(),
+                tableIdentifier.getDatabaseName(),
+                tableIdentifier.getObjectName());
     }
 
     public FileFormat fileFormat() {

Reply via email to