This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new 525866c1 [FLINK-30587] Validate primary key in append-only table in ddl
525866c1 is described below
commit 525866c1711a7aa1b96d60e1b3e7c6786b89f1e1
Author: shammon <[email protected]>
AuthorDate: Mon Jan 9 10:08:44 2023 +0800
[FLINK-30587] Validate primary key in append-only table in ddl
This closes #462
---
.../org/apache/flink/table/store/CoreOptions.java | 10 ++++++++
.../table/store/file/schema/SchemaManagerTest.java | 29 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 7df5730f..21fc29e9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.format.FileFormat;
@@ -45,11 +46,13 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
import static
org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
import static
org.apache.flink.table.store.file.schema.TableSchema.SYSTEM_FIELD_NAMES;
import static org.apache.flink.util.Preconditions.checkState;
@@ -837,6 +840,13 @@ public class CoreOptions implements Serializable {
"Field name[%s] in schema cannot
start with [%s]",
f, KEY_FIELD_PREFIX));
});
+
+ // Cannot define any primary key in an append-only table.
+ if (!schema.primaryKeys().isEmpty() && Objects.equals(APPEND_ONLY,
options.writeMode())) {
+ throw new TableException(
+ "Cannot define any primary key in an append-only table.
Set 'write-mode'='change-log' if "
+ + "still want to keep the primary key
definition.");
+ }
}
private static void checkOptionExistInMode(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index fb4086a1..c38ce97f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -19,7 +19,9 @@
package org.apache.flink.table.store.file.schema;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DoubleType;
@@ -262,4 +264,31 @@ public class SchemaManagerTest {
new UpdateSchema(rowType, partitionKeys, primaryKeys, options,
"");
retryArtificialException(() ->
manager.commitNewVersion(schemaWithPrimaryKeys));
}
+
+ @Test
+ public void testAppendOnlyTableWithPrimaryKey() throws Exception {
+ RowType newType = RowType.of(new IntType(), new BigIntType());
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
+ UpdateSchema changeLogSchema =
+ new UpdateSchema(newType, partitionKeys, primaryKeys, options,
"change-log table");
+ retryArtificialException(() ->
manager.commitNewVersion(changeLogSchema));
+
+ options.put(CoreOptions.WRITE_MODE.key(),
WriteMode.APPEND_ONLY.toString());
+ UpdateSchema updateSchema =
+ new UpdateSchema(
+ newType,
+ partitionKeys,
+ primaryKeys,
+ options,
+ "append-only table with primary key");
+ assertThatThrownBy(
+ () ->
+ retryArtificialException(
+ () ->
manager.commitNewVersion(updateSchema)))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "Cannot define any primary key in an append-only
table. "
+ + "Set 'write-mode'='change-log' if still want
to keep the primary key definition.");
+ }
}