This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e3a708d [FLINK-26753] PK constraint should include partition keys if
table is partitioned
e3a708d is described below
commit e3a708d8ce04b9831805a818baa3d09140024f42
Author: Jane Chan <[email protected]>
AuthorDate: Wed Mar 23 11:10:14 2022 +0800
[FLINK-26753] PK constraint should include partition keys if table is
partitioned
This closes #54
---
.../flink/table/store/connector/TableStore.java | 41 ++++++++
.../table/store/connector/TableStoreFactory.java | 38 ++-----
.../table/store/connector/CreateTableITCase.java | 9 +-
.../table/store/connector/DropTableITCase.java | 5 +-
.../table/store/connector/FileStoreITCase.java | 6 +-
.../store/connector/ReadWriteTableITCase.java | 3 +-
.../store/connector/TableStoreFactoryTest.java | 111 +++++++++++++++------
.../table/store/connector/TableStoreTestBase.java | 11 +-
.../store/connector/sink/LogStoreSinkITCase.java | 15 ++-
.../flink/table/store/file/FileStoreOptions.java | 29 ++++--
10 files changed, 184 insertions(+), 84 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a0a5859..a21ebfe 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
@@ -52,9 +53,11 @@ import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -100,11 +103,13 @@ public class TableStore {
public TableStore withPartitions(int[] partitions) {
this.partitions = partitions;
+ adjustIndexAndValidate();
return this;
}
public TableStore withPrimaryKeys(int[] primaryKeys) {
this.primaryKeys = primaryKeys;
+ adjustIndexAndValidate();
return this;
}
@@ -130,6 +135,12 @@ public class TableStore {
return partitionType.getFieldNames();
}
+ @VisibleForTesting
+ List<String> primaryKeys() {
+ RowType primaryKeyType = TypeUtils.project(type, primaryKeys);
+ return primaryKeyType.getFieldNames();
+ }
+
public Configuration logOptions() {
return new DelegatingConfiguration(options, LOG_PREFIX);
}
@@ -171,6 +182,36 @@ public class TableStore {
tableIdentifier, options, user, partitionType, keyType,
valueType, mergeFunction);
}
+ private void adjustIndexAndValidate() {
+ if (primaryKeys.length > 0 && partitions.length > 0) {
+ List<Integer> pkList =
Arrays.stream(primaryKeys).boxed().collect(Collectors.toList());
+ List<Integer> partitionList =
+
Arrays.stream(partitions).boxed().collect(Collectors.toList());
+
+ String pkInfo =
+ type == null
+ ? pkList.toString()
+ : TypeUtils.project(type,
primaryKeys).getFieldNames().toString();
+ String partitionInfo =
+ type == null
+ ? partitionList.toString()
+ : TypeUtils.project(type,
partitions).getFieldNames().toString();
+ Preconditions.checkState(
+ pkList.containsAll(partitionList),
+ String.format(
+ "Primary key constraint %s should include all
partition fields %s",
+ pkInfo, partitionInfo));
+ primaryKeys =
+ Arrays.stream(primaryKeys).filter(pk ->
!partitionList.contains(pk)).toArray();
+
+ Preconditions.checkState(
+ primaryKeys.length > 0,
+ String.format(
+ "Primary key constraint %s should not be same with
partition fields %s, this will result in only one record in a partition",
+ pkInfo, partitionInfo));
+ }
+ }
+
/** Source builder to build a flink {@link Source}. */
public class SourceBuilder {
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index cbe6f8a..4cd73a8 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -45,12 +44,12 @@ import
org.apache.flink.table.store.log.LogOptions.LogConsistency;
import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -58,7 +57,6 @@ import java.util.stream.Collectors;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
@@ -88,7 +86,7 @@ public class TableStoreFactory
@Override
public void onCreateTable(Context context, boolean ignoreIfExists) {
Map<String, String> options = context.getCatalogTable().getOptions();
- Path path = tablePath(options, context.getObjectIdentifier());
+ Path path = FileStoreOptions.path(options,
context.getObjectIdentifier());
try {
if (path.getFileSystem().exists(path) && !ignoreIfExists) {
throw new TableException(
@@ -125,7 +123,7 @@ public class TableStoreFactory
@Override
public void onDropTable(Context context, boolean ignoreIfNotExists) {
Map<String, String> options = context.getCatalogTable().getOptions();
- Path path = tablePath(options, context.getObjectIdentifier());
+ Path path = FileStoreOptions.path(options,
context.getObjectIdentifier());
try {
if (path.getFileSystem().exists(path)) {
path.getFileSystem().delete(path, true);
@@ -241,32 +239,14 @@ public class TableStoreFactory
}
@VisibleForTesting
- static Path tablePath(Map<String, String> options, ObjectIdentifier
identifier) {
- Preconditions.checkArgument(
- options.containsKey(FILE_PATH.key()),
- String.format(
- "Failed to create file store path. "
- + "Please specify a root dir by setting
session level configuration "
- + "as `SET 'table-store.%s' = '...'`. "
- + "Alternatively, you can use a per-table root
dir "
- + "as `CREATE TABLE ${table} (...) WITH ('%s'
= '...')`",
- FILE_PATH.key(), FILE_PATH.key()));
- return new Path(
- options.get(FILE_PATH.key()),
- String.format(
- "root/%s.catalog/%s.db/%s",
- identifier.getCatalogName(),
- identifier.getDatabaseName(),
- identifier.getObjectName()));
- }
-
- private TableStore buildTableStore(Context context) {
+ TableStore buildTableStore(Context context) {
ResolvedCatalogTable catalogTable = context.getCatalogTable();
ResolvedSchema schema = catalogTable.getResolvedSchema();
RowType rowType = (RowType)
schema.toPhysicalRowDataType().getLogicalType();
- int[] primaryKeys = new int[0];
+ List<String> partitionKeys = catalogTable.getPartitionKeys();
+ int[] pkIndex = new int[0];
if (schema.getPrimaryKey().isPresent()) {
- primaryKeys =
+ pkIndex =
schema.getPrimaryKey().get().getColumns().stream()
.mapToInt(rowType.getFieldNames()::indexOf)
.toArray();
@@ -274,9 +254,9 @@ public class TableStoreFactory
return new TableStore(Configuration.fromMap(catalogTable.getOptions()))
.withTableIdentifier(context.getObjectIdentifier())
.withSchema(rowType)
- .withPrimaryKeys(primaryKeys)
+ .withPrimaryKeys(pkIndex)
.withPartitions(
- catalogTable.getPartitionKeys().stream()
+ partitionKeys.stream()
.mapToInt(rowType.getFieldNames()::indexOf)
.toArray());
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
index 29698a7..26f3c1c 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -78,7 +79,9 @@ public class CreateTableITCase extends TableStoreTestBase {
assertThat(((TableEnvironmentImpl)
tEnv).getCatalogManager().getTable(tableIdentifier))
.isPresent();
// check table store
- assertThat(Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+ assertThat(
+ Paths.get(rootPath,
FileStoreOptions.relativeTablePath(tableIdentifier))
+ .toFile())
.exists();
// check log store
assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore);
@@ -128,7 +131,9 @@ public class CreateTableITCase extends TableStoreTestBase {
}
} else if (expectedResult.expectedMessage.startsWith("Failed to create
file store path.")) {
// failed when creating file store
- Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
+ Paths.get(rootPath,
FileStoreOptions.relativeTablePath(tableIdentifier))
+ .toFile()
+ .mkdirs();
} else if (expectedResult.expectedMessage.startsWith("Failed to create
kafka topic.")) {
// failed when creating log store
createTopicIfNotExists(tableIdentifier.asSummaryString(),
BUCKET.defaultValue());
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
index 2e0f1fc..eb65439 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -25,6 +25,7 @@ import
org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -79,7 +80,9 @@ public class DropTableITCase extends TableStoreTestBase {
assertThat(((TableEnvironmentImpl)
tEnv).getCatalogManager().getTable(tableIdentifier))
.isNotPresent();
// check table store
- assertThat(Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+ assertThat(
+ Paths.get(rootPath,
FileStoreOptions.relativeTablePath(tableIdentifier))
+ .toFile())
.doesNotExist();
// check log store
assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index a0d5185..95e1f9b 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -117,7 +117,7 @@ public class FileStoreITCase extends AbstractTestBase {
@Test
public void testPartitioned() throws Exception {
- store.withPartitions(new int[] {1});
+ store.withPrimaryKeys(new int[] {1, 2}).withPartitions(new int[] {1});
// write
store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
@@ -153,7 +153,7 @@ public class FileStoreITCase extends AbstractTestBase {
@Test
public void testOverwrite() throws Exception {
Assume.assumeTrue(isBatch);
- store.withPartitions(new int[] {1});
+ store.withPrimaryKeys(new int[] {1, 2}).withPartitions(new int[] {1});
// write
store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
@@ -193,7 +193,7 @@ public class FileStoreITCase extends AbstractTestBase {
@Test
public void testPartitionedNonKey() throws Exception {
- store.withPartitions(new int[] {1}).withPrimaryKeys(new int[0]);
+ store.withPrimaryKeys(new int[0]).withPartitions(new int[] {1});
// write
store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 3bdf8cc..819e018 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -86,7 +87,7 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
}
}
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedResult.expectedRecords);
- String relativeFilePath =
getRelativeFileStoreTablePath(tableIdentifier);
+ String relativeFilePath =
FileStoreOptions.relativeTablePath(tableIdentifier);
// check snapshot file path
assertThat(Paths.get(rootPath, relativeFilePath,
"snapshot")).exists();
// check manifest file path
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
index 2e894bd..bfde9aa 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -27,8 +27,9 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -39,15 +40,19 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Stream;
+import static
org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
@@ -60,7 +65,7 @@ public class TableStoreFactoryTest {
private static final ObjectIdentifier TABLE_IDENTIFIER =
ObjectIdentifier.of("catalog", "database", "table");
- private final ManagedTableFactory tableStoreFactory = new
TableStoreFactory();
+ private final TableStoreFactory tableStoreFactory = new
TableStoreFactory();
@TempDir private static java.nio.file.Path sharedTempDir;
private DynamicTableFactory.Context context;
@@ -84,11 +89,7 @@ public class TableStoreFactoryTest {
Path expectedPath =
Paths.get(
sharedTempDir.toAbsolutePath().toString(),
- String.format(
- "root/%s.catalog/%s.db/%s",
- TABLE_IDENTIFIER.getCatalogName(),
- TABLE_IDENTIFIER.getDatabaseName(),
- TABLE_IDENTIFIER.getObjectName()));
+ relativeTablePath(TABLE_IDENTIFIER));
boolean exist = expectedPath.toFile().exists();
if (ignoreIfExists || !exist) {
tableStoreFactory.onCreateTable(context, ignoreIfExists);
@@ -119,11 +120,7 @@ public class TableStoreFactoryTest {
Path expectedPath =
Paths.get(
sharedTempDir.toAbsolutePath().toString(),
- String.format(
- "root/%s.catalog/%s.db/%s",
- TABLE_IDENTIFIER.getCatalogName(),
- TABLE_IDENTIFIER.getDatabaseName(),
- TABLE_IDENTIFIER.getObjectName()));
+ relativeTablePath(TABLE_IDENTIFIER));
boolean exist = expectedPath.toFile().exists();
if (exist || ignoreIfNotExists) {
tableStoreFactory.onDropTable(context, ignoreIfNotExists);
@@ -161,23 +158,41 @@ public class TableStoreFactoryTest {
.containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
}
- @Test
- public void testTablePath() {
- Map<String, String> options = of(FILE_PATH.key(), "dummy:/foo/bar");
- assertThat(TableStoreFactory.tablePath(options, TABLE_IDENTIFIER))
- .isEqualTo(
- new org.apache.flink.core.fs.Path(
-
"dummy:/foo/bar/root/catalog.catalog/database.db/table"));
-
- assertThatThrownBy(
- () ->
TableStoreFactory.tablePath(Collections.emptyMap(), TABLE_IDENTIFIER))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "Failed to create file store path. "
- + "Please specify a root dir by setting
session level configuration "
- + "as `SET 'table-store.file.path' = '...'`. "
- + "Alternatively, you can use a per-table root
dir "
- + "as `CREATE TABLE ${table} (...) WITH
('file.path' = '...')`");
+ @ParameterizedTest
+ @MethodSource("providingResolvedTable")
+ public void testBuildTableStore(
+ RowType rowType,
+ List<String> partitions,
+ int[] pkIndex,
+ TableStoreTestBase.ExpectedResult expectedResult) {
+ ResolvedCatalogTable catalogTable =
+ createResolvedTable(Collections.emptyMap(), rowType,
partitions, pkIndex);
+ context =
+ new FactoryUtil.DefaultDynamicTableContext(
+ TABLE_IDENTIFIER,
+ catalogTable,
+ Collections.emptyMap(),
+ Configuration.fromMap(Collections.emptyMap()),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ if (expectedResult.success) {
+ TableStore tableStore = tableStoreFactory.buildTableStore(context);
+
assertThat(tableStore.partitioned()).isEqualTo(catalogTable.isPartitioned());
+ assertThat(tableStore.valueCountMode())
+
.isEqualTo(catalogTable.getResolvedSchema().getPrimaryKeyIndexes().length == 0);
+
+ // check primary key doesn't contain partition
+ if (tableStore.partitioned() && !tableStore.valueCountMode()) {
+ assertThat(
+ tableStore.primaryKeys().stream()
+ .noneMatch(pk ->
tableStore.partitionKeys().contains(pk)))
+ .isTrue();
+ }
+ } else {
+ assertThatThrownBy(() ->
tableStoreFactory.buildTableStore(context))
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+ }
}
// ~ Tools
------------------------------------------------------------------
@@ -256,6 +271,44 @@ public class TableStoreFactoryTest {
Arguments.of(enrichedOptions, false));
}
+ private static Stream<Arguments> providingResolvedTable() {
+ RowType rowType = TestKeyValueGenerator.ROW_TYPE;
+ // success case
+ Arguments arg0 =
+ Arguments.of(
+ rowType,
+ Arrays.asList("dt", "hr"),
+ new int[] {0, 1, 2}, // pk is [dt, hr, shopId]
+ new TableStoreTestBase.ExpectedResult().success(true));
+
+ // failed case: pk doesn't contain partition key
+ Arguments arg1 =
+ Arguments.of(
+ rowType,
+ Arrays.asList("dt", "hr"),
+ new int[] {2}, // pk is [shopId]
+ new TableStoreTestBase.ExpectedResult()
+ .success(false)
+ .expectedType(IllegalStateException.class)
+ .expectedMessage(
+ "Primary key constraint [shopId]
should include all partition fields [dt, hr]"));
+
+ // failed case: pk is same as partition key
+ Arguments arg2 =
+ Arguments.of(
+ rowType,
+ Arrays.asList("dt", "hr", "shopId"),
+ new int[] {0, 1, 2}, // pk is [dt, hr, shopId]
+ new TableStoreTestBase.ExpectedResult()
+ .success(false)
+ .expectedType(IllegalStateException.class)
+ .expectedMessage(
+ "Primary key constraint [dt, hr,
shopId] should not be same with partition fields [dt, hr, shopId],"
+ + " this will result in only
one record in a partition"));
+
+ return Stream.of(arg0, arg1, arg2);
+ }
+
private static Map<String, String> addPrefix(
Map<String, String> options, String prefix, Predicate<String>
predicate) {
Map<String, String> newOptions = new HashMap<>();
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index e3f5f1c..0fe91af 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -146,15 +147,7 @@ public abstract class TableStoreTestBase extends
KafkaTableTestBase {
protected void deleteTablePath() {
FileUtils.deleteQuietly(
- Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile());
- }
-
- protected static String getRelativeFileStoreTablePath(ObjectIdentifier
tableIdentifier) {
- return String.format(
- "root/%s.catalog/%s.db/%s",
- tableIdentifier.getCatalogName(),
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName());
+ Paths.get(rootPath,
FileStoreOptions.relativeTablePath(tableIdentifier)).toFile());
}
/** Expected result wrapper. */
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 38a4b5a..d12848f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -85,17 +85,24 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
}
private void innerTest(
- String name, boolean isBatch, boolean partitioned, boolean
transaction, boolean keyed)
+ String name, boolean isBatch, boolean partitioned, boolean
transaction, boolean hasPk)
throws Exception {
StreamExecutionEnvironment env = isBatch ? buildBatchEnv() :
buildStreamEnv();
// in eventual mode, failure will result in duplicate data
TableStore store = buildTableStore(isBatch || !transaction,
TEMPORARY_FOLDER);
if (partitioned) {
+ if (hasPk) {
+ store.withPrimaryKeys(new int[] {1, 2});
+ } else {
+ store.withPrimaryKeys(new int[0]);
+ }
store.withPartitions(new int[] {1});
+ } else {
+ store.withPartitions(new int[0]);
}
- if (!keyed) {
+ if (!hasPk) {
store.withPrimaryKeys(new int[0]);
}
@@ -109,7 +116,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
? LogOptions.LogConsistency.TRANSACTIONAL
: LogOptions.LogConsistency.EVENTUAL,
TABLE_TYPE,
- keyed ? new int[] {2} : new int[0]);
+ hasPk ? new int[] {2} : new int[0]);
KafkaLogStoreFactory factory = discoverKafkaLogFactory();
KafkaLogSinkProvider sinkProvider =
factory.createSinkProvider(context, SINK_CONTEXT);
@@ -130,7 +137,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
Row[] expected;
- if (keyed) {
+ if (hasPk) {
expected =
partitioned
? new Row[] {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index aec50ed..c4b9c38 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -26,10 +26,12 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.time.Duration;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -129,13 +131,28 @@ public class FileStoreOptions implements Serializable {
}
public Path path(ObjectIdentifier tableIdentifier) {
- return new Path(
- options.get(FILE_PATH),
+ return path(options.toMap(), tableIdentifier);
+ }
+
+ public static Path path(Map<String, String> options, ObjectIdentifier
tableIdentifier) {
+ Preconditions.checkArgument(
+ options.containsKey(FILE_PATH.key()),
String.format(
- "root/%s.catalog/%s.db/%s",
- tableIdentifier.getCatalogName(),
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName()));
+ "Failed to create file store path. "
+ + "Please specify a root dir by setting
session level configuration "
+ + "as `SET 'table-store.%s' = '...'`. "
+ + "Alternatively, you can use a per-table root
dir "
+ + "as `CREATE TABLE ${table} (...) WITH ('%s'
= '...')`",
+ FILE_PATH.key(), FILE_PATH.key()));
+ return new Path(options.get(FILE_PATH.key()),
relativeTablePath(tableIdentifier));
+ }
+
+ public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+ return String.format(
+ "%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName());
}
public FileFormat fileFormat() {