This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c46c40b9d [core] Unify delete&update&mergeinto for merge engines 
(#3208)
c46c40b9d is described below

commit c46c40b9d69019a7faba3532834706ae756bed8c
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 15 18:58:29 2024 +0800

    [core] Unify delete&update&mergeinto for merge engines (#3208)
---
 docs/content/primary-key-table/merge-engine.md     |   6 -
 .../main/java/org/apache/paimon/CoreOptions.java   |  40 +----
 .../apache/paimon/flink/action/DeleteAction.java   |  12 +-
 .../paimon/flink/action/MergeIntoAction.java       |  18 +-
 .../SupportsRowLevelOperationFlinkTableSink.java   |  15 +-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 192 ++++++---------------
 6 files changed, 71 insertions(+), 212 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine.md 
b/docs/content/primary-key-table/merge-engine.md
index 7ae7035e4..bc6c1ee35 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -34,12 +34,6 @@ result in strange behavior. When the input is out of order, 
we recommend that yo
 [Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field" 
>}}) to correct disorder.
 {{< /hint >}}
 
-{{< hint info >}}
-Some compute engines support row level update and delete in batch mode but not 
all merge engines support them.
-- Support batch update merge engines: `deduplicate` and `first-row`.
-- Support batch delete merge engines: `deduplicate`.
-{{< /hint >}}
-
 ## Deduplicate
 
 `deduplicate` merge engine is the default merge engine. Paimon will only keep 
the latest record and throw away other records with the same primary keys.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b73133bae..37bd0c217 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1802,28 +1802,20 @@ public class CoreOptions implements Serializable {
 
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
-        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", 
true, true),
+        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
 
-        PARTIAL_UPDATE("partial-update", "Partial update non-null fields.", 
false, false),
+        PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
 
-        AGGREGATE("aggregation", "Aggregate fields with same primary key.", 
false, false),
+        AGGREGATE("aggregation", "Aggregate fields with same primary key."),
 
-        FIRST_ROW("first-row", "De-duplicate and keep the first row.", true, 
false);
+        FIRST_ROW("first-row", "De-duplicate and keep the first row.");
 
         private final String value;
         private final String description;
-        private final boolean supportBatchUpdate;
-        private final boolean supportBatchDelete;
-
-        MergeEngine(
-                String value,
-                String description,
-                boolean supportBatchUpdate,
-                boolean supportBatchDelete) {
+
+        MergeEngine(String value, String description) {
             this.value = value;
             this.description = description;
-            this.supportBatchUpdate = supportBatchUpdate;
-            this.supportBatchDelete = supportBatchDelete;
         }
 
         @Override
@@ -1835,26 +1827,6 @@ public class CoreOptions implements Serializable {
         public InlineElement getDescription() {
             return text(description);
         }
-
-        public boolean supportBatchUpdate() {
-            return supportBatchUpdate;
-        }
-
-        public boolean supportBatchDelete() {
-            return supportBatchDelete;
-        }
-
-        public static List<MergeEngine> supportBatchUpdateEngines() {
-            return Arrays.stream(MergeEngine.values())
-                    .filter(MergeEngine::supportBatchUpdate)
-                    .collect(Collectors.toList());
-        }
-
-        public static List<MergeEngine> supportBatchDeleteEngines() {
-            return Arrays.stream(MergeEngine.values())
-                    .filter(MergeEngine::supportBatchDelete)
-                    .collect(Collectors.toList());
-        }
     }
 
     /** Specifies the startup mode for log consumer. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index ad3b18ccb..c275ce6f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
@@ -35,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+
 /** Delete from table action for Flink. */
 public class DeleteAction extends TableActionBase {
 
@@ -55,14 +56,11 @@ public class DeleteAction extends TableActionBase {
     @Override
     public void run() throws Exception {
         CoreOptions.MergeEngine mergeEngine = 
CoreOptions.fromMap(table.options()).mergeEngine();
-        Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");
-
-        if (!mergeEngine.supportBatchDelete()) {
+        if (mergeEngine != DEDUPLICATE) {
             throw new UnsupportedOperationException(
                     String.format(
-                            "Delete is executed in batch mode, but merge 
engine %s can not support batch delete."
-                                    + " Support batch delete merge engines 
are: %s.",
-                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+                            "Delete is executed in batch mode, but merge 
engine %s can not support batch delete.",
+                            mergeEngine));
         }
 
         LOG.debug("Run delete action with filter '{}'.", filter);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index ba232b019..f99b22973 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -45,6 +45,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
 
 /**
@@ -223,20 +225,10 @@ public class MergeIntoAction extends TableActionBase {
         }
 
         CoreOptions.MergeEngine mergeEngine = 
CoreOptions.fromMap(table.options()).mergeEngine();
-        if ((matchedUpsert || notMatchedUpsert) && 
!mergeEngine.supportBatchUpdate()) {
+        boolean supportMergeInto = mergeEngine == DEDUPLICATE || mergeEngine 
== PARTIAL_UPDATE;
+        if (!supportMergeInto) {
             throw new UnsupportedOperationException(
-                    String.format(
-                            "merge-into is executed in batch mode, and you 
have set matched_upsert or not_matched_by_source_upsert."
-                                    + " But merge engine %s can not support 
batch update. Support batch update merge engines are: %s.",
-                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
-        }
-
-        if ((matchedDelete || notMatchedDelete) && 
!mergeEngine.supportBatchDelete()) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "merge-into is executed in batch mode, and you 
have set matched_delete or not_matched_by_source_delete."
-                                    + " But merge engine %s can not support 
batch delete. Support batch delete merge engines are: %s.",
-                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+                    String.format("Merge engine %s can not support 
merge-into.", mergeEngine));
         }
 
         if ((matchedUpsert && matchedDelete)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 5d8ed8c74..c81804a23 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -59,6 +59,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 
 /** Flink table sink that supports row level update and delete. */
 public abstract class SupportsRowLevelOperationFlinkTableSink extends 
FlinkTableSinkBase
@@ -115,11 +117,10 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
                 });
 
         MergeEngine mergeEngine = options.get(MERGE_ENGINE);
-        if (!mergeEngine.supportBatchUpdate()) {
+        boolean supportUpdate = mergeEngine == DEDUPLICATE || mergeEngine == 
PARTIAL_UPDATE;
+        if (!supportUpdate) {
             throw new UnsupportedOperationException(
-                    String.format(
-                            "Merge engine %s can not support batch update. 
Support batch update merge engines are: %s.",
-                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
+                    String.format("Merge engine %s can not support batch 
update.", mergeEngine));
         }
 
         // Even with partial-update we still need all columns. Because the 
topology
@@ -184,11 +185,9 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
         }
 
         MergeEngine mergeEngine = 
CoreOptions.fromMap(table.options()).mergeEngine();
-        if (!mergeEngine.supportBatchDelete()) {
+        if (mergeEngine != DEDUPLICATE) {
             throw new UnsupportedOperationException(
-                    String.format(
-                            "Merge engine %s can not support batch delete. 
Support batch delete merge engines are: %s.",
-                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+                    String.format("Merge engine %s can not support batch 
delete.", mergeEngine));
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index c2235c85b..589854c78 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -54,16 +54,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -72,7 +71,6 @@ import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
-import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -1330,14 +1328,11 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     // 
----------------------------------------------------------------------------------------------------------------
 
     @ParameterizedTest
-    @EnumSource(CoreOptions.MergeEngine.class)
-    public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
+    @ValueSource(strings = {"deduplicate", "partial-update"})
+    public void testUpdateWithPrimaryKey(String mergeEngine) throws Exception {
         // Step1: define table schema
         Map<String, String> options = new HashMap<>();
-        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
-        if (mergeEngine == FIRST_ROW) {
-            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
-        }
+        options.put(MERGE_ENGINE.key(), mergeEngine);
         String table =
                 createTable(
                         Arrays.asList(
@@ -1357,23 +1352,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 "(3, 'Euro', 114, '2022-01-01')",
                 "(3, 'Euro', 119, '2022-01-02')");
 
-        // Step3: prepare expected data.
-        String rowKind = mergeEngine == FIRST_ROW ? "+I" : "+U";
-        List<Row> expectedRecords =
-                Arrays.asList(
-                        // part = 2022-01-01
-                        changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
-                        changelogRow(
-                                rowKind,
-                                2L,
-                                mergeEngine == FIRST_ROW ? "UNKNOWN" : "Yen",
-                                mergeEngine == FIRST_ROW ? -1 : 1L,
-                                "2022-01-01"),
-                        changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
-                        // part = 2022-01-02
-                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
-
-        // Step4: prepare update statement
+        // Step3: prepare update statement
         String updateStatement =
                 String.format(
                         "UPDATE %s "
@@ -1382,15 +1361,19 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                 + "WHERE currency = 'UNKNOWN' and dt = 
'2022-01-01'",
                         table);
 
-        // Step5: execute update statement and verify result
-        if (mergeEngine.supportBatchUpdate()) {
-            bEnv.executeSql(updateStatement).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(updateStatement).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        // Step4: execute update statement and verify result
+        bEnv.executeSql(updateStatement).await();
+        String querySql = String.format("SELECT * FROM %s", table);
+        String rowKind = mergeEngine.equals("deduplicate") ? "+U" : "+I";
+        testBatchRead(
+                querySql,
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
+                        changelogRow(rowKind, 2L, "Yen", 1L, "2022-01-01"),
+                        changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02")));
     }
 
     @Test
@@ -1504,18 +1487,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     // Delete statement
     // 
----------------------------------------------------------------------------------------------------------------
 
-    @ParameterizedTest
-    @EnumSource(CoreOptions.MergeEngine.class)
-    public void testDeleteWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
-        Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
-        supportUpdateEngines.add(DEDUPLICATE);
-
+    @Test
+    public void testDeleteWithPrimaryKey() throws Exception {
         // Step1: define table schema
-        Map<String, String> options = new HashMap<>();
-        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
-        if (mergeEngine == FIRST_ROW) {
-            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
-        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1525,7 +1499,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                 "dt String"),
                         Arrays.asList("id", "dt"),
                         Collections.singletonList("dt"),
-                        options);
+                        Collections.emptyMap());
 
         // Step2: batch write some historical data
         insertInto(
@@ -1538,18 +1512,13 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         String deleteStatement = String.format("DELETE FROM %s WHERE currency 
= 'UNKNOWN'", table);
 
         // Step4: execute delete statement and verify result
-        List<Row> expectedRecords =
+        bEnv.executeSql(deleteStatement).await();
+        String querySql = String.format("SELECT * FROM %s", table);
+        testBatchRead(
+                querySql,
                 Arrays.asList(
                         changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
-                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02")));
     }
 
     @Test
@@ -1578,28 +1547,13 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         String deleteStatement = String.format("DELETE FROM %s WHERE currency 
= 'UNKNOWN'", table);
 
         // Step4: execute delete statement and verify result
-        List<Row> expectedRecords =
-                Arrays.asList(
-                        changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
-                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
-
         assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
                 
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
     }
 
-    @ParameterizedTest
-    @EnumSource(CoreOptions.MergeEngine.class)
-    public void testDeletePushDownWithPrimaryKey(CoreOptions.MergeEngine 
mergeEngine)
-            throws Exception {
-        Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
-        supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
-
+    @Test
+    public void testDeletePushDownWithPrimaryKey() throws Exception {
         // Step1: define table schema
-        Map<String, String> options = new HashMap<>();
-        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
-        if (mergeEngine == FIRST_ROW) {
-            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
-        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1609,7 +1563,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                 "dt String"),
                         Arrays.asList("id", "dt"),
                         Collections.singletonList("dt"),
-                        options);
+                        Collections.emptyMap());
 
         // Step2: batch write some historical data
         insertInto(
@@ -1635,51 +1589,24 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         changelogRow("+I", 6L, "CAD", 119L, "2022-01-03"),
                         changelogRow("+I", 7L, "INR", 119L, "2022-01-03"),
                         changelogRow("+I", 8L, "MOP", 119L, "2022-01-03"));
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement).await();
+        String querySql = String.format("SELECT * FROM %s", table);
+        testBatchRead(querySql, expectedRecords);
 
         // Test2 delete statement no where
         String deleteStatement2 = String.format("DELETE FROM %s", table);
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement2).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, Collections.emptyList());
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement2).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement2).await();
+        testBatchRead(String.format("SELECT * FROM %s", table), 
Collections.emptyList());
 
         // Test3 delete statement where pt
         String deleteStatement3 = String.format("DELETE FROM %s WHERE dt = 
'2022-01-03'", table);
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement3).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, Collections.emptyList());
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement3).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement3).await();
+        testBatchRead(String.format("SELECT * FROM %s", table), 
Collections.emptyList());
     }
 
-    @ParameterizedTest
-    @EnumSource(CoreOptions.MergeEngine.class)
-    public void testDeletePushDownWithPartitionKey(CoreOptions.MergeEngine 
mergeEngine)
-            throws Exception {
-        Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
-        supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
-
+    @Test
+    public void testDeletePushDownWithPartitionKey() throws Exception {
         // Step1: define table schema
-        Map<String, String> options = new HashMap<>();
-        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
-        if (mergeEngine == FIRST_ROW) {
-            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
-        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1690,7 +1617,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                 "hh String"),
                         Arrays.asList("id", "dt", "hh"),
                         Arrays.asList("dt", "hh"),
-                        options);
+                        Collections.emptyMap());
 
         // Step2: batch write some historical data
         insertInto(
@@ -1718,14 +1645,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         changelogRow("+I", 6L, "CAD", 119L, "2022-01-03", 
"16"),
                         changelogRow("+I", 7L, "INR", 119L, "2022-01-03", 
"17"),
                         changelogRow("+I", 8L, "MOP", 119L, "2022-01-03", 
"18"));
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement).await();
+        String querySql = String.format("SELECT * FROM %s", table);
+        testBatchRead(querySql, expectedRecords);
 
         // Step5: partition key not push down
         String deleteStatement1 =
@@ -1737,14 +1659,8 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         changelogRow("+I", 6L, "CAD", 119L, "2022-01-03", 
"16"),
                         changelogRow("+I", 7L, "INR", 119L, "2022-01-03", 
"17"),
                         changelogRow("+I", 8L, "MOP", 119L, "2022-01-03", 
"18"));
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement1).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords1);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement1).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement1).await();
+        testBatchRead(String.format("SELECT * FROM %s", table), 
expectedRecords1);
 
         // Step6: partition key delete push down
         String deleteStatement2 =
@@ -1757,14 +1673,8 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         changelogRow("+I", 2L, "UNKNOWN", -1L, "2022-01-01", 
"12"),
                         changelogRow("+I", 7L, "INR", 119L, "2022-01-03", 
"17"),
                         changelogRow("+I", 8L, "MOP", 119L, "2022-01-03", 
"18"));
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement2).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords2);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement2).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement2).await();
+        testBatchRead(String.format("SELECT * FROM %s", table), 
expectedRecords2);
 
         // Step8: partition key delete push down
         String deleteStatement3 = String.format("DELETE FROM %s WHERE dt = 
'2022-01-03'", table);
@@ -1774,14 +1684,8 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 Arrays.asList(
                         changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01", "11"),
                         changelogRow("+I", 2L, "UNKNOWN", -1L, "2022-01-01", 
"12"));
-        if (supportUpdateEngines.contains(mergeEngine)) {
-            bEnv.executeSql(deleteStatement3).await();
-            String querySql = String.format("SELECT * FROM %s", table);
-            testBatchRead(querySql, expectedRecords3);
-        } else {
-            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement3).await())
-                    
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
-        }
+        bEnv.executeSql(deleteStatement3).await();
+        testBatchRead(String.format("SELECT * FROM %s", table), 
expectedRecords3);
     }
 
     // 
----------------------------------------------------------------------------------------------------------------

Reply via email to