This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e8509a429 [core] Validate changelog producer on table without primary
keys (#1784)
e8509a429 is described below
commit e8509a4294932ea944fd6203583c252e9c3e6ef4
Author: GuojunLi <[email protected]>
AuthorDate: Fri Aug 11 11:36:27 2023 +0800
[core] Validate changelog producer on table without primary keys (#1784)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../org/apache/paimon/schema/SchemaValidation.java | 9 +++++++++
.../apache/paimon/flink/CatalogTableITCase.java | 17 ++++++++++++++++
.../org/apache/paimon/spark/SparkReadITCase.java | 23 ++++++++++++++++++++++
5 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc9ca8900..383501f8f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -48,7 +48,7 @@ under the License.
<td><h5>changelog-producer</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
- <td>Whether to double write to a changelog file. This changelog
file keeps the details of data changes, it can be read directly during stream
reads.<br /><br />Possible values:<ul><li>"none": No changelog
file.</li><li>"input": Double write to a changelog file when flushing memory
table, the changelog is from input.</li><li>"full-compaction": Generate
changelog files with each full compaction.</li><li>"lookup": Generate changelog
files through 'lookup' before committing the d [...]
+ <td>Whether to double write to a changelog file. This changelog
file keeps the details of data changes, it can be read directly during stream
reads. This can be applied to tables with primary keys. <br /><br />Possible
values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a
changelog file when flushing memory table, the changelog is from
input.</li><li>"full-compaction": Generate changelog files with each full
compaction.</li><li>"lookup": Generate change [...]
</tr>
<tr>
<td><h5>changelog-producer.row-deduplicate</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index cea7f74a0..1e1408068 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -378,7 +378,7 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to double write to a changelog file. "
+ "This changelog file keeps the details
of data changes, "
- + "it can be read directly during stream
reads.");
+ + "it can be read directly during stream
reads. This can be applied to tables with primary keys. ");
public static final ConfigOption<Boolean>
CHANGELOG_PRODUCER_ROW_DEDUPLICATE =
key("changelog-producer.row-deduplicate")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 95f7689e7..dd5929376 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -92,6 +92,15 @@ public class SchemaValidation {
WRITE_MODE.key(), APPEND_ONLY,
CHANGELOG_PRODUCER.key()));
}
+ if (options.writeMode() == WriteMode.AUTO
+ && schema.primaryKeys().isEmpty()
+ && changelogProducer != ChangelogProducer.NONE) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Can not set %s on table without primary keys,
please define primary keys.",
+ CHANGELOG_PRODUCER.key()));
+ }
+
checkArgument(
options.snapshotNumRetainMin() > 0,
SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 5b40d2921..bce8c48ed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -385,6 +385,23 @@ public class CatalogTableITCase extends CatalogITCaseBase {
"Can not set the write-mode to append-only and
changelog-producer at the same time.");
}
+ @Test
+ public void testChangelogProducerOnAppendOnlyTable() {
+ assertThatThrownBy(
+ () -> sql("CREATE TABLE T (a INT) WITH
('changelog-producer' = 'input')"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Can not set changelog-producer on table without
primary keys, please define primary keys.");
+
+ sql("CREATE TABLE T (a INT)");
+ assertThatThrownBy(() -> sql("ALTER TABLE T SET
('changelog-producer'='input')"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Can not set changelog-producer on table without
primary keys, please define primary keys.");
+ }
+
@Test
public void testFileFormatPerLevel() {
sql(
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 00e30e598..4c5843947 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -312,6 +312,29 @@ public class SparkReadITCase extends SparkReadTestBase {
"Can not set the write-mode to append-only and
changelog-producer at the same time.");
}
+ @Test
+ public void testChangelogProducerOnAppendOnlyTable() {
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "CREATE TABLE T (a INT) TBLPROPERTIES
('changelog-producer' = 'input')"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Can not set changelog-producer on table without
primary keys, please define primary keys.");
+
+ spark.sql("CREATE TABLE T (a INT)");
+
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "ALTER TABLE T SET
TBLPROPERTIES('changelog-producer' 'input')"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Can not set changelog-producer on table without
primary keys, please define primary keys.");
+ }
+
@Test
public void testCreateTableWithNullablePk() {
spark.sql(