This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 67dc98ede [core] Add validation to forbid setting 
stream-read-overwrite with 'full-compaction' or 'lookup' changelog producer 
(#2180)
67dc98ede is described below

commit 67dc98ede8820ccbe89b794e6f47e500dca6bdd4
Author: yuzelin <[email protected]>
AuthorDate: Mon Oct 30 11:04:32 2023 +0800

    [core] Add validation to forbid setting stream-read-overwrite with 
'full-compaction' or 'lookup' changelog producer (#2180)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  3 ++-
 .../org/apache/paimon/schema/SchemaValidation.java | 11 +++++++++++
 .../apache/paimon/flink/CatalogTableITCase.java    | 22 ++++++++++++++++++++++
 4 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0be1a41c6..fba0ce19d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -561,7 +561,7 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><h5>streaming-read-overwrite</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Whether to read the changes from overwrite in streaming 
mode.</td>
+            <td>Whether to read the changes from overwrite in streaming mode. 
Cannot be set to true when changelog producer is full-compaction or lookup 
because it will read duplicated changes.</td>
         </tr>
         <tr>
             <td><h5>tag.automatic-creation</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 88243fc6b..643aee75f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -514,7 +514,8 @@ public class CoreOptions implements Serializable {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Whether to read the changes from overwrite in 
streaming mode.");
+                            "Whether to read the changes from overwrite in 
streaming mode. Cannot be set to true when "
+                                    + "changelog producer is full-compaction 
or lookup because it will read duplicated changes.");
 
     public static final ConfigOption<Boolean> DYNAMIC_PARTITION_OVERWRITE =
             key("dynamic-partition-overwrite")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 440a8dc97..ed3f28e57 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -53,6 +53,7 @@ import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
 import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
+import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
 import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
 import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -87,6 +88,16 @@ public class SchemaValidation {
                             "Can not set %s on table without primary keys, 
please define primary keys.",
                             CHANGELOG_PRODUCER.key()));
         }
+        if (options.streamingReadOverwrite()
+                && (changelogProducer == ChangelogProducer.FULL_COMPACTION
+                        || changelogProducer == ChangelogProducer.LOOKUP)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Cannot set %s to true when changelog producer is 
%s or %s because it will read duplicated changes.",
+                            STREAMING_READ_OVERWRITE.key(),
+                            ChangelogProducer.FULL_COMPACTION,
+                            ChangelogProducer.LOOKUP));
+        }
 
         checkArgument(
                 options.snapshotNumRetainMin() > 0,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index ce21b5035..a17725605 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.system.AllTableOptionsTable;
 import org.apache.paimon.table.system.CatalogOptionsTable;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.utils.BlockingIterator;
 
@@ -682,4 +683,25 @@ public class CatalogTableITCase extends CatalogITCaseBase {
             assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]", 
"[3]", "[4]");
         }
     }
+
+    @Test
+    public void testInvalidStreamingReadOverwrite() {
+        String ddl =
+                "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b STRING)"
+                        + "WITH ('changelog-producer' = '%s', 
'streaming-read-overwrite' = 'true')";
+
+        assertThatThrownBy(() -> sql(ddl, "full-compaction"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot set streaming-read-overwrite to true 
when changelog producer "
+                                        + "is full-compaction or lookup 
because it will read duplicated changes."));
+
+        assertThatThrownBy(() -> sql(ddl, "lookup"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot set streaming-read-overwrite to true 
when changelog producer "
+                                        + "is full-compaction or lookup 
because it will read duplicated changes."));
+    }
 }

Reply via email to