This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f883a85 [flink] integrate DELETE for flink-1.17 (#982)
79f883a85 is described below

commit 79f883a8547be22624c4aa9794860955a0d94549
Author: legendtkl <[email protected]>
AuthorDate: Mon May 8 11:43:45 2023 +0800

    [flink] integrate DELETE for flink-1.17 (#982)
---
 .../apache/paimon/flink/sink/FlinkTableSink.java   | 30 +++++++-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 83 ++++++++++++++++++++++
 2 files changed, 112 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 857f9ad98..17a503f17 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -29,6 +29,7 @@ import org.apache.paimon.table.Table;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.factories.DynamicTableFactory;
 
@@ -42,7 +43,8 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 
 /** Table sink to create sink. */
-public class FlinkTableSink extends FlinkTableSinkBase implements 
SupportsRowLevelUpdate {
+public class FlinkTableSink extends FlinkTableSinkBase
+        implements SupportsRowLevelUpdate, SupportsRowLevelDelete {
 
     public FlinkTableSink(
             ObjectIdentifier tableIdentifier,
@@ -106,4 +108,30 @@ public class FlinkTableSink extends FlinkTableSinkBase 
implements SupportsRowLev
                             table.getClass().getName()));
         }
     }
+
+    @Override
+    public RowLevelDeleteInfo applyRowLevelDelete(
+            @Nullable RowLevelModificationScanContext 
rowLevelModificationScanContext) {
+        if (table instanceof ChangelogWithKeyFileStoreTable) {
+            Options options = Options.fromMap(table.options());
+            if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
+                return new RowLevelDeleteInfo() {};
+            }
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "merge engine '%s' can not support delete, 
currently only %s can support delete.",
+                            options.get(MERGE_ENGINE), 
MergeEngine.DEDUPLICATE));
+        } else if (table instanceof AppendOnlyFileStoreTable
+                || table instanceof ChangelogValueCountFileStoreTable) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "table '%s' can not support delete, because there 
is no primary key.",
+                            table.getClass().getName()));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "%s can not support delete, because it is an 
unknown subclass of FileStoreTable.",
+                            table.getClass().getName()));
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 238c5012f..d989dfb6a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1570,6 +1570,89 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
     }
 
+    // 
----------------------------------------------------------------------------------------------------------------
+    // Delete statement
+    // 
----------------------------------------------------------------------------------------------------------------
+
+    @ParameterizedTest
+    @EnumSource(CoreOptions.MergeEngine.class)
+    public void testDeleteWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
+        Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
+        supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+
+        // Step1: define table schema
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
+        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        String table =
+                createTable(
+                        Arrays.asList(
+                                "id BIGINT NOT NULL",
+                                "currency STRING",
+                                "rate BIGINT",
+                                "dt String"),
+                        Arrays.asList("id", "dt"),
+                        Collections.singletonList("dt"),
+                        options);
+
+        // Step2: batch write some historical data
+        insertInto(
+                table,
+                "(1, 'US Dollar', 114, '2022-01-01')",
+                "(2, 'UNKNOWN', -1, '2022-01-01')",
+                "(3, 'Euro', 119, '2022-01-02')");
+
+        // Step3: prepare delete statement
+        String deleteStatement = String.format("DELETE FROM %s WHERE currency 
= 'UNKNOWN'", table);
+
+        // Step4: execute delete statement and verify result
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
+                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
+        if (supportUpdateEngines.contains(mergeEngine)) {
+            bEnv.executeSql(deleteStatement).await();
+            String querySql = String.format("SELECT * FROM %s", table);
+            testBatchRead(querySql, expectedRecords);
+        } else {
+            assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
+                    
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(WriteMode.class)
+    public void testDeleteWithoutPrimaryKey(WriteMode writeMode) throws 
Exception {
+        // Step1: define table schema
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), writeMode.toString());
+        options.put(MERGE_ENGINE.key(), 
MERGE_ENGINE.defaultValue().toString());
+        String table =
+                createTable(
+                        Arrays.asList(
+                                "id BIGINT NOT NULL",
+                                "currency STRING",
+                                "rate BIGINT",
+                                "dt String"),
+                        Collections.emptyList(),
+                        Collections.singletonList("dt"),
+                        options);
+
+        // Step2: batch write some historical data
+        insertInto(
+                table,
+                "(1, 'US Dollar', 114, '2022-01-01')",
+                "(2, 'UNKNOWN', -1, '2022-01-01')",
+                "(3, 'Euro', 119, '2022-01-02')");
+
+        // Step3: prepare delete statement
+        String deleteStatement = String.format("DELETE FROM %s WHERE currency 
= 'UNKNOWN'", table);
+
+        // Step4: execute delete statement and verify result
+        assertThatThrownBy(() -> bEnv.executeSql(deleteStatement).await())
+                
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+    }
+
     // 
----------------------------------------------------------------------------------------------------------------
     // Tools
     // 
----------------------------------------------------------------------------------------------------------------

Reply via email to