This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79f883a85 [flink] integrate DELETE for flink-1.17 (#982)
79f883a85 is described below
commit 79f883a8547be22624c4aa9794860955a0d94549
Author: legendtkl <[email protected]>
AuthorDate: Mon May 8 11:43:45 2023 +0800
[flink] integrate DELETE for flink-1.17 (#982)
---
.../apache/paimon/flink/sink/FlinkTableSink.java | 30 +++++++-
.../apache/paimon/flink/ReadWriteTableITCase.java | 83 ++++++++++++++++++++++
2 files changed, 112 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 857f9ad98..17a503f17 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -29,6 +29,7 @@ import org.apache.paimon.table.Table;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
import org.apache.flink.table.factories.DynamicTableFactory;
@@ -42,7 +43,8 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
/** Table sink to create sink. */
-public class FlinkTableSink extends FlinkTableSinkBase implements
SupportsRowLevelUpdate {
+public class FlinkTableSink extends FlinkTableSinkBase
+ implements SupportsRowLevelUpdate, SupportsRowLevelDelete {
public FlinkTableSink(
ObjectIdentifier tableIdentifier,
@@ -106,4 +108,30 @@ public class FlinkTableSink extends FlinkTableSinkBase
implements SupportsRowLev
table.getClass().getName()));
}
}
+
+ @Override
+ public RowLevelDeleteInfo applyRowLevelDelete(
+ @Nullable RowLevelModificationScanContext
rowLevelModificationScanContext) {
+ if (table instanceof ChangelogWithKeyFileStoreTable) {
+ Options options = Options.fromMap(table.options());
+ if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
+ return new RowLevelDeleteInfo() {};
+ }
+ throw new UnsupportedOperationException(
+ String.format(
+ "merge engine '%s' can not support delete,
currently only %s can support delete.",
+ options.get(MERGE_ENGINE),
MergeEngine.DEDUPLICATE));
+ } else if (table instanceof AppendOnlyFileStoreTable
+ || table instanceof ChangelogValueCountFileStoreTable) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "table '%s' can not support delete, because there
is no primary key.",
+ table.getClass().getName()));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s can not support delete, because it is an
unknown subclass of FileStoreTable.",
+ table.getClass().getName()));
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 238c5012f..d989dfb6a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1570,6 +1570,89 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
}
+ //
----------------------------------------------------------------------------------------------------------------
+ // Delete statement
+ //
----------------------------------------------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @EnumSource(CoreOptions.MergeEngine.class)
+ public void testDeleteWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
+ Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
+ supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+
+ // Step1: define table schema
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
+ options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ String table =
+ createTable(
+ Arrays.asList(
+ "id BIGINT NOT NULL",
+ "currency STRING",
+ "rate BIGINT",
+ "dt String"),
+ Arrays.asList("id", "dt"),
+ Collections.singletonList("dt"),
+ options);
+
+ // Step2: batch write some historical data
+ insertInto(
+ table,
+ "(1, 'US Dollar', 114, '2022-01-01')",
+ "(2, 'UNKNOWN', -1, '2022-01-01')",
+ "(3, 'Euro', 119, '2022-01-02')");
+
+ // Step3: prepare delete statement
+ String deleteStatement = String.format("DELETE FROM %s WHERE currency
= 'UNKNOWN'", table);
+
+ // Step4: execute delete statement and verify result
+ List<Row> expectedRecords =
+ Arrays.asList(
+ changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
+ changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
+ if (supportUpdateEngines.contains(mergeEngine)) {
+ bEnv.executeSql(deleteStatement).await();
+ String querySql = String.format("SELECT * FROM %s", table);
+ testBatchRead(querySql, expectedRecords);
+ } else {
+ assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
+
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(WriteMode.class)
+ public void testDeleteWithoutPrimaryKey(WriteMode writeMode) throws
Exception {
+ // Step1: define table schema
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_MODE.key(), writeMode.toString());
+ options.put(MERGE_ENGINE.key(),
MERGE_ENGINE.defaultValue().toString());
+ String table =
+ createTable(
+ Arrays.asList(
+ "id BIGINT NOT NULL",
+ "currency STRING",
+ "rate BIGINT",
+ "dt String"),
+ Collections.emptyList(),
+ Collections.singletonList("dt"),
+ options);
+
+ // Step2: batch write some historical data
+ insertInto(
+ table,
+ "(1, 'US Dollar', 114, '2022-01-01')",
+ "(2, 'UNKNOWN', -1, '2022-01-01')",
+ "(3, 'Euro', 119, '2022-01-02')");
+
+ // Step3: prepare delete statement
+ String deleteStatement = String.format("DELETE FROM %s WHERE currency
= 'UNKNOWN'", table);
+
+ // Step4: execute delete statement and verify result
+ assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
+
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+ }
+
//
----------------------------------------------------------------------------------------------------------------
// Tools
//
----------------------------------------------------------------------------------------------------------------