This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c46c40b9d [core] Unify delete&update&mergeinto for merge engines
(#3208)
c46c40b9d is described below
commit c46c40b9d69019a7faba3532834706ae756bed8c
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 15 18:58:29 2024 +0800
[core] Unify delete&update&mergeinto for merge engines (#3208)
---
docs/content/primary-key-table/merge-engine.md | 6 -
.../main/java/org/apache/paimon/CoreOptions.java | 40 +----
.../apache/paimon/flink/action/DeleteAction.java | 12 +-
.../paimon/flink/action/MergeIntoAction.java | 18 +-
.../SupportsRowLevelOperationFlinkTableSink.java | 15 +-
.../apache/paimon/flink/ReadWriteTableITCase.java | 192 ++++++---------------
6 files changed, 71 insertions(+), 212 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine.md
b/docs/content/primary-key-table/merge-engine.md
index 7ae7035e4..bc6c1ee35 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -34,12 +34,6 @@ result in strange behavior. When the input is out of order,
we recommend that yo
[Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field"
>}}) to correct disorder.
{{< /hint >}}
-{{< hint info >}}
-Some compute engines support row level update and delete in batch mode but not
all merge engines support them.
-- Support batch update merge engines: `deduplicate` and `first-row`.
-- Support batch delete merge engines: `deduplicate`.
-{{< /hint >}}
-
## Deduplicate
`deduplicate` merge engine is the default merge engine. Paimon will only keep
the latest record and throw away other records with the same primary keys.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b73133bae..37bd0c217 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1802,28 +1802,20 @@ public class CoreOptions implements Serializable {
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
- DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.",
true, true),
+ DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
- PARTIAL_UPDATE("partial-update", "Partial update non-null fields.",
false, false),
+ PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
- AGGREGATE("aggregation", "Aggregate fields with same primary key.",
false, false),
+ AGGREGATE("aggregation", "Aggregate fields with same primary key."),
- FIRST_ROW("first-row", "De-duplicate and keep the first row.", true,
false);
+ FIRST_ROW("first-row", "De-duplicate and keep the first row.");
private final String value;
private final String description;
- private final boolean supportBatchUpdate;
- private final boolean supportBatchDelete;
-
- MergeEngine(
- String value,
- String description,
- boolean supportBatchUpdate,
- boolean supportBatchDelete) {
+
+ MergeEngine(String value, String description) {
this.value = value;
this.description = description;
- this.supportBatchUpdate = supportBatchUpdate;
- this.supportBatchDelete = supportBatchDelete;
}
@Override
@@ -1835,26 +1827,6 @@ public class CoreOptions implements Serializable {
public InlineElement getDescription() {
return text(description);
}
-
- public boolean supportBatchUpdate() {
- return supportBatchUpdate;
- }
-
- public boolean supportBatchDelete() {
- return supportBatchDelete;
- }
-
- public static List<MergeEngine> supportBatchUpdateEngines() {
- return Arrays.stream(MergeEngine.values())
- .filter(MergeEngine::supportBatchUpdate)
- .collect(Collectors.toList());
- }
-
- public static List<MergeEngine> supportBatchDeleteEngines() {
- return Arrays.stream(MergeEngine.values())
- .filter(MergeEngine::supportBatchDelete)
- .collect(Collectors.toList());
- }
}
/** Specifies the startup mode for log consumer. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index ad3b18ccb..c275ce6f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.utils.Preconditions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -35,6 +34,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+
/** Delete from table action for Flink. */
public class DeleteAction extends TableActionBase {
@@ -55,14 +56,11 @@ public class DeleteAction extends TableActionBase {
@Override
public void run() throws Exception {
CoreOptions.MergeEngine mergeEngine =
CoreOptions.fromMap(table.options()).mergeEngine();
- Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");
-
- if (!mergeEngine.supportBatchDelete()) {
+ if (mergeEngine != DEDUPLICATE) {
throw new UnsupportedOperationException(
String.format(
- "Delete is executed in batch mode, but merge
engine %s can not support batch delete."
- + " Support batch delete merge engines
are: %s.",
- mergeEngine,
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+ "Delete is executed in batch mode, but merge
engine %s can not support batch delete.",
+ mergeEngine));
}
LOG.debug("Run delete action with filter '{}'.", filter);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index ba232b019..f99b22973 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -45,6 +45,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
/**
@@ -223,20 +225,10 @@ public class MergeIntoAction extends TableActionBase {
}
CoreOptions.MergeEngine mergeEngine =
CoreOptions.fromMap(table.options()).mergeEngine();
- if ((matchedUpsert || notMatchedUpsert) &&
!mergeEngine.supportBatchUpdate()) {
+ boolean supportMergeInto = mergeEngine == DEDUPLICATE || mergeEngine
== PARTIAL_UPDATE;
+ if (!supportMergeInto) {
throw new UnsupportedOperationException(
- String.format(
- "merge-into is executed in batch mode, and you
have set matched_upsert or not_matched_by_source_upsert."
- + " But merge engine %s can not support
batch update. Support batch update merge engines are: %s.",
- mergeEngine,
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
- }
-
- if ((matchedDelete || notMatchedDelete) &&
!mergeEngine.supportBatchDelete()) {
- throw new UnsupportedOperationException(
- String.format(
- "merge-into is executed in batch mode, and you
have set matched_delete or not_matched_by_source_delete."
- + " But merge engine %s can not support
batch delete. Support batch delete merge engines are: %s.",
- mergeEngine,
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+ String.format("Merge engine %s can not support
merge-into.", mergeEngine));
}
if ((matchedUpsert && matchedDelete)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 5d8ed8c74..c81804a23 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -59,6 +59,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
/** Flink table sink that supports row level update and delete. */
public abstract class SupportsRowLevelOperationFlinkTableSink extends
FlinkTableSinkBase
@@ -115,11 +117,10 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
});
MergeEngine mergeEngine = options.get(MERGE_ENGINE);
- if (!mergeEngine.supportBatchUpdate()) {
+ boolean supportUpdate = mergeEngine == DEDUPLICATE || mergeEngine ==
PARTIAL_UPDATE;
+ if (!supportUpdate) {
throw new UnsupportedOperationException(
- String.format(
- "Merge engine %s can not support batch update.
Support batch update merge engines are: %s.",
- mergeEngine,
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
+ String.format("Merge engine %s can not support batch
update.", mergeEngine));
}
// Even with partial-update we still need all columns. Because the
topology
@@ -184,11 +185,9 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
}
MergeEngine mergeEngine =
CoreOptions.fromMap(table.options()).mergeEngine();
- if (!mergeEngine.supportBatchDelete()) {
+ if (mergeEngine != DEDUPLICATE) {
throw new UnsupportedOperationException(
- String.format(
- "Merge engine %s can not support batch delete.
Support batch delete merge engines are: %s.",
- mergeEngine,
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+ String.format("Merge engine %s can not support batch
delete.", mergeEngine));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index c2235c85b..589854c78 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -54,16 +54,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -72,7 +71,6 @@ import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
-import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -1330,14 +1328,11 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
//
----------------------------------------------------------------------------------------------------------------
@ParameterizedTest
- @EnumSource(CoreOptions.MergeEngine.class)
- public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
+ @ValueSource(strings = {"deduplicate", "partial-update"})
+ public void testUpdateWithPrimaryKey(String mergeEngine) throws Exception {
// Step1: define table schema
Map<String, String> options = new HashMap<>();
- options.put(MERGE_ENGINE.key(), mergeEngine.toString());
- if (mergeEngine == FIRST_ROW) {
- options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
- }
+ options.put(MERGE_ENGINE.key(), mergeEngine);
String table =
createTable(
Arrays.asList(
@@ -1357,23 +1352,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"(3, 'Euro', 114, '2022-01-01')",
"(3, 'Euro', 119, '2022-01-02')");
- // Step3: prepare expected data.
- String rowKind = mergeEngine == FIRST_ROW ? "+I" : "+U";
- List<Row> expectedRecords =
- Arrays.asList(
- // part = 2022-01-01
- changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
- changelogRow(
- rowKind,
- 2L,
- mergeEngine == FIRST_ROW ? "UNKNOWN" : "Yen",
- mergeEngine == FIRST_ROW ? -1 : 1L,
- "2022-01-01"),
- changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
- // part = 2022-01-02
- changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
-
- // Step4: prepare update statement
+ // Step3: prepare update statement
String updateStatement =
String.format(
"UPDATE %s "
@@ -1382,15 +1361,19 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
+ "WHERE currency = 'UNKNOWN' and dt =
'2022-01-01'",
table);
- // Step5: execute update statement and verify result
- if (mergeEngine.supportBatchUpdate()) {
- bEnv.executeSql(updateStatement).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(updateStatement).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ // Step4: execute update statement and verify result
+ bEnv.executeSql(updateStatement).await();
+ String querySql = String.format("SELECT * FROM %s", table);
+ String rowKind = mergeEngine.equals("deduplicate") ? "+U" : "+I";
+ testBatchRead(
+ querySql,
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
+ changelogRow(rowKind, 2L, "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", 3L, "Euro", 119L, "2022-01-02")));
}
@Test
@@ -1504,18 +1487,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
// Delete statement
//
----------------------------------------------------------------------------------------------------------------
- @ParameterizedTest
- @EnumSource(CoreOptions.MergeEngine.class)
- public void testDeleteWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
- Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
- supportUpdateEngines.add(DEDUPLICATE);
-
+ @Test
+ public void testDeleteWithPrimaryKey() throws Exception {
// Step1: define table schema
- Map<String, String> options = new HashMap<>();
- options.put(MERGE_ENGINE.key(), mergeEngine.toString());
- if (mergeEngine == FIRST_ROW) {
- options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
- }
String table =
createTable(
Arrays.asList(
@@ -1525,7 +1499,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"dt String"),
Arrays.asList("id", "dt"),
Collections.singletonList("dt"),
- options);
+ Collections.emptyMap());
// Step2: batch write some historical data
insertInto(
@@ -1538,18 +1512,13 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
String deleteStatement = String.format("DELETE FROM %s WHERE currency
= 'UNKNOWN'", table);
// Step4: execute delete statement and verify result
- List<Row> expectedRecords =
+ bEnv.executeSql(deleteStatement).await();
+ String querySql = String.format("SELECT * FROM %s", table);
+ testBatchRead(
+ querySql,
Arrays.asList(
changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
- changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ changelogRow("+I", 3L, "Euro", 119L, "2022-01-02")));
}
@Test
@@ -1578,28 +1547,13 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
String deleteStatement = String.format("DELETE FROM %s WHERE currency
= 'UNKNOWN'", table);
// Step4: execute delete statement and verify result
- List<Row> expectedRecords =
- Arrays.asList(
- changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
- changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
-
assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
}
- @ParameterizedTest
- @EnumSource(CoreOptions.MergeEngine.class)
- public void testDeletePushDownWithPrimaryKey(CoreOptions.MergeEngine
mergeEngine)
- throws Exception {
- Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
- supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
-
+ @Test
+ public void testDeletePushDownWithPrimaryKey() throws Exception {
// Step1: define table schema
- Map<String, String> options = new HashMap<>();
- options.put(MERGE_ENGINE.key(), mergeEngine.toString());
- if (mergeEngine == FIRST_ROW) {
- options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
- }
String table =
createTable(
Arrays.asList(
@@ -1609,7 +1563,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"dt String"),
Arrays.asList("id", "dt"),
Collections.singletonList("dt"),
- options);
+ Collections.emptyMap());
// Step2: batch write some historical data
insertInto(
@@ -1635,51 +1589,24 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
changelogRow("+I", 6L, "CAD", 119L, "2022-01-03"),
changelogRow("+I", 7L, "INR", 119L, "2022-01-03"),
changelogRow("+I", 8L, "MOP", 119L, "2022-01-03"));
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement).await();
+ String querySql = String.format("SELECT * FROM %s", table);
+ testBatchRead(querySql, expectedRecords);
// Test2 delete statement no where
String deleteStatement2 = String.format("DELETE FROM %s", table);
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement2).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, Collections.emptyList());
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement2).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement2).await();
+ testBatchRead(String.format("SELECT * FROM %s", table),
Collections.emptyList());
// Test3 delete statement where pt
String deleteStatement3 = String.format("DELETE FROM %s WHERE dt =
'2022-01-03'", table);
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement3).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, Collections.emptyList());
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement3).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement3).await();
+ testBatchRead(String.format("SELECT * FROM %s", table),
Collections.emptyList());
}
- @ParameterizedTest
- @EnumSource(CoreOptions.MergeEngine.class)
- public void testDeletePushDownWithPartitionKey(CoreOptions.MergeEngine
mergeEngine)
- throws Exception {
- Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
- supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
-
+ @Test
+ public void testDeletePushDownWithPartitionKey() throws Exception {
// Step1: define table schema
- Map<String, String> options = new HashMap<>();
- options.put(MERGE_ENGINE.key(), mergeEngine.toString());
- if (mergeEngine == FIRST_ROW) {
- options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
- }
String table =
createTable(
Arrays.asList(
@@ -1690,7 +1617,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"hh String"),
Arrays.asList("id", "dt", "hh"),
Arrays.asList("dt", "hh"),
- options);
+ Collections.emptyMap());
// Step2: batch write some historical data
insertInto(
@@ -1718,14 +1645,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
changelogRow("+I", 6L, "CAD", 119L, "2022-01-03",
"16"),
changelogRow("+I", 7L, "INR", 119L, "2022-01-03",
"17"),
changelogRow("+I", 8L, "MOP", 119L, "2022-01-03",
"18"));
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement).await();
+ String querySql = String.format("SELECT * FROM %s", table);
+ testBatchRead(querySql, expectedRecords);
// Step5: partition key not push down
String deleteStatement1 =
@@ -1737,14 +1659,8 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
changelogRow("+I", 6L, "CAD", 119L, "2022-01-03",
"16"),
changelogRow("+I", 7L, "INR", 119L, "2022-01-03",
"17"),
changelogRow("+I", 8L, "MOP", 119L, "2022-01-03",
"18"));
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement1).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords1);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement1).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement1).await();
+ testBatchRead(String.format("SELECT * FROM %s", table),
expectedRecords1);
// Step6: partition key delete push down
String deleteStatement2 =
@@ -1757,14 +1673,8 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
changelogRow("+I", 2L, "UNKNOWN", -1L, "2022-01-01",
"12"),
changelogRow("+I", 7L, "INR", 119L, "2022-01-03",
"17"),
changelogRow("+I", 8L, "MOP", 119L, "2022-01-03",
"18"));
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement2).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords2);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement2).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement2).await();
+ testBatchRead(String.format("SELECT * FROM %s", table),
expectedRecords2);
// Step8: partition key delete push down
String deleteStatement3 = String.format("DELETE FROM %s WHERE dt =
'2022-01-03'", table);
@@ -1774,14 +1684,8 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
Arrays.asList(
changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01", "11"),
changelogRow("+I", 2L, "UNKNOWN", -1L, "2022-01-01",
"12"));
- if (supportUpdateEngines.contains(mergeEngine)) {
- bEnv.executeSql(deleteStatement3).await();
- String querySql = String.format("SELECT * FROM %s", table);
- testBatchRead(querySql, expectedRecords3);
- } else {
- assertThatThrownBy(() -> bEnv.executeSql(deleteStatement3).await())
-
.satisfies(anyCauseMatches(UnsupportedOperationException.class));
- }
+ bEnv.executeSql(deleteStatement3).await();
+ testBatchRead(String.format("SELECT * FROM %s", table),
expectedRecords3);
}
//
----------------------------------------------------------------------------------------------------------------