This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e8509a429 [core] Validate changelog producer on table without primary 
keys (#1784)
e8509a429 is described below

commit e8509a4294932ea944fd6203583c252e9c3e6ef4
Author: GuojunLi <[email protected]>
AuthorDate: Fri Aug 11 11:36:27 2023 +0800

    [core] Validate changelog producer on table without primary keys (#1784)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  9 +++++++++
 .../apache/paimon/flink/CatalogTableITCase.java    | 17 ++++++++++++++++
 .../org/apache/paimon/spark/SparkReadITCase.java   | 23 ++++++++++++++++++++++
 5 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc9ca8900..383501f8f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -48,7 +48,7 @@ under the License.
             <td><h5>changelog-producer</h5></td>
             <td style="word-wrap: break-word;">none</td>
             <td><p>Enum</p></td>
-            <td>Whether to double write to a changelog file. This changelog 
file keeps the details of data changes, it can be read directly during stream 
reads.<br /><br />Possible values:<ul><li>"none": No changelog 
file.</li><li>"input": Double write to a changelog file when flushing memory 
table, the changelog is from input.</li><li>"full-compaction": Generate 
changelog files with each full compaction.</li><li>"lookup": Generate changelog 
files through 'lookup' before committing the d [...]
+            <td>Whether to double write to a changelog file. This changelog 
file keeps the details of data changes, it can be read directly during stream 
reads. This can be applied to tables with primary keys. <br /><br />Possible 
values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a 
changelog file when flushing memory table, the changelog is from 
input.</li><li>"full-compaction": Generate changelog files with each full 
compaction.</li><li>"lookup": Generate change [...]
         </tr>
         <tr>
             <td><h5>changelog-producer.row-deduplicate</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index cea7f74a0..1e1408068 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -378,7 +378,7 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to double write to a changelog file. "
                                     + "This changelog file keeps the details 
of data changes, "
-                                    + "it can be read directly during stream 
reads.");
+                                    + "it can be read directly during stream 
reads. This can be applied to tables with primary keys. ");
 
     public static final ConfigOption<Boolean> 
CHANGELOG_PRODUCER_ROW_DEDUPLICATE =
             key("changelog-producer.row-deduplicate")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 95f7689e7..dd5929376 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -92,6 +92,15 @@ public class SchemaValidation {
                             WRITE_MODE.key(), APPEND_ONLY, 
CHANGELOG_PRODUCER.key()));
         }
 
+        if (options.writeMode() == WriteMode.AUTO
+                && schema.primaryKeys().isEmpty()
+                && changelogProducer != ChangelogProducer.NONE) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Can not set %s on table without primary keys, 
please define primary keys.",
+                            CHANGELOG_PRODUCER.key()));
+        }
+
         checkArgument(
                 options.snapshotNumRetainMin() > 0,
                 SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 5b40d2921..bce8c48ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -385,6 +385,23 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         "Can not set the write-mode to append-only and 
changelog-producer at the same time.");
     }
 
+    @Test
+    public void testChangelogProducerOnAppendOnlyTable() {
+        assertThatThrownBy(
+                        () -> sql("CREATE TABLE T (a INT) WITH 
('changelog-producer' = 'input')"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Can not set changelog-producer on table without 
primary keys, please define primary keys.");
+
+        sql("CREATE TABLE T (a INT)");
+        assertThatThrownBy(() -> sql("ALTER TABLE T SET 
('changelog-producer'='input')"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Can not set changelog-producer on table without 
primary keys, please define primary keys.");
+    }
+
     @Test
     public void testFileFormatPerLevel() {
         sql(
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 00e30e598..4c5843947 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -312,6 +312,29 @@ public class SparkReadITCase extends SparkReadTestBase {
                         "Can not set the write-mode to append-only and 
changelog-producer at the same time.");
     }
 
+    @Test
+    public void testChangelogProducerOnAppendOnlyTable() {
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "CREATE TABLE T (a INT) TBLPROPERTIES 
('changelog-producer' = 'input')"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Can not set changelog-producer on table without 
primary keys, please define primary keys.");
+
+        spark.sql("CREATE TABLE T (a INT)");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE T SET 
TBLPROPERTIES('changelog-producer' 'input')"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Can not set changelog-producer on table without 
primary keys, please define primary keys.");
+    }
+
     @Test
     public void testCreateTableWithNullablePk() {
         spark.sql(

Reply via email to