This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a5d1b3c77 [flink] Add merge-engine check when executing row level 
batch update and delete (#3181)
a5d1b3c77 is described below

commit a5d1b3c77c2683f23bbc5c7e755dfbc7463a369a
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 15 10:03:02 2024 +0800

    [flink] Add merge-engine check when executing row level batch update and 
delete (#3181)
---
 docs/content/primary-key-table/merge-engine.md     |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 40 ++++++++--
 .../apache/paimon/flink/action/DeleteAction.java   | 14 ++++
 .../paimon/flink/action/MergeIntoAction.java       | 54 +++++++++++--
 .../flink/action/MergeIntoActionFactory.java       | 34 +-------
 .../paimon/flink/procedure/MergeIntoProcedure.java |  3 +-
 .../SupportsRowLevelOperationFlinkTableSink.java   | 90 +++++++++++-----------
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 17 ++--
 .../paimon/flink/action/MergeIntoActionITCase.java | 53 -------------
 9 files changed, 158 insertions(+), 153 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine.md 
b/docs/content/primary-key-table/merge-engine.md
index bc6c1ee35..7ae7035e4 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -34,6 +34,12 @@ result in strange behavior. When the input is out of order, 
we recommend that yo
 [Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field" 
>}}) to correct disorder.
 {{< /hint >}}
 
+{{< hint info >}}
+Some compute engines support row level update and delete in batch mode but not 
all merge engines support them.
+- Support batch update merge engines: `deduplicate` and `first-row`.
+- Support batch delete merge engines: `deduplicate`.
+{{< /hint >}}
+
 ## Deduplicate
 
 `deduplicate` merge engine is the default merge engine. Paimon will only keep 
the latest record and throw away other records with the same primary keys.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 54ab8b6f3..0d78d2ec8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1705,20 +1705,28 @@ public class CoreOptions implements Serializable {
 
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
-        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
+        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", 
true, true),
 
-        PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
+        PARTIAL_UPDATE("partial-update", "Partial update non-null fields.", 
false, false),
 
-        AGGREGATE("aggregation", "Aggregate fields with same primary key."),
+        AGGREGATE("aggregation", "Aggregate fields with same primary key.", 
false, false),
 
-        FIRST_ROW("first-row", "De-duplicate and keep the first row.");
+        FIRST_ROW("first-row", "De-duplicate and keep the first row.", true, 
false);
 
         private final String value;
         private final String description;
-
-        MergeEngine(String value, String description) {
+        private final boolean supportBatchUpdate;
+        private final boolean supportBatchDelete;
+
+        MergeEngine(
+                String value,
+                String description,
+                boolean supportBatchUpdate,
+                boolean supportBatchDelete) {
             this.value = value;
             this.description = description;
+            this.supportBatchUpdate = supportBatchUpdate;
+            this.supportBatchDelete = supportBatchDelete;
         }
 
         @Override
@@ -1730,6 +1738,26 @@ public class CoreOptions implements Serializable {
         public InlineElement getDescription() {
             return text(description);
         }
+
+        public boolean supportBatchUpdate() {
+            return supportBatchUpdate;
+        }
+
+        public boolean supportBatchDelete() {
+            return supportBatchDelete;
+        }
+
+        public static List<MergeEngine> supportBatchUpdateEngines() {
+            return Arrays.stream(MergeEngine.values())
+                    .filter(MergeEngine::supportBatchUpdate)
+                    .collect(Collectors.toList());
+        }
+
+        public static List<MergeEngine> supportBatchDeleteEngines() {
+            return Arrays.stream(MergeEngine.values())
+                    .filter(MergeEngine::supportBatchDelete)
+                    .collect(Collectors.toList());
+        }
     }
 
     /** Specifies the startup mode for log consumer. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 2fdb658ad..ad3b18ccb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.utils.Preconditions;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.data.GenericRowData;
@@ -51,6 +54,17 @@ public class DeleteAction extends TableActionBase {
 
     @Override
     public void run() throws Exception {
+        CoreOptions.MergeEngine mergeEngine = 
CoreOptions.fromMap(table.options()).mergeEngine();
+        Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");
+
+        if (!mergeEngine.supportBatchDelete()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Delete is executed in batch mode, but merge 
engine %s can not support batch delete."
+                                    + " Support batch delete merge engines 
are: %s.",
+                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+        }
+
         LOG.debug("Run delete action with filter '{}'.", filter);
 
         Table queriedTable =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 4ec5e6ef5..ba232b019 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
@@ -96,11 +97,11 @@ public class MergeIntoAction extends TableActionBase {
     private String mergeCondition;
 
     // actions to be taken
-    boolean matchedUpsert;
-    boolean notMatchedUpsert;
-    boolean matchedDelete;
-    boolean notMatchedDelete;
-    boolean insert;
+    private boolean matchedUpsert;
+    private boolean notMatchedUpsert;
+    private boolean matchedDelete;
+    private boolean notMatchedDelete;
+    private boolean insert;
 
     // upsert
     @Nullable String matchedUpsertCondition;
@@ -215,6 +216,49 @@ public class MergeIntoAction extends TableActionBase {
         return this;
     }
 
+    public void validate() {
+        if (!matchedUpsert && !notMatchedUpsert && !matchedDelete && 
!notMatchedDelete && !insert) {
+            throw new IllegalArgumentException(
+                    "Must specify at least one merge action. Run 'merge_into 
--help' for help.");
+        }
+
+        CoreOptions.MergeEngine mergeEngine = 
CoreOptions.fromMap(table.options()).mergeEngine();
+        if ((matchedUpsert || notMatchedUpsert) && 
!mergeEngine.supportBatchUpdate()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "merge-into is executed in batch mode, and you 
have set matched_upsert or not_matched_by_source_upsert."
+                                    + " But merge engine %s can not support 
batch update. Support batch update merge engines are: %s.",
+                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
+        }
+
+        if ((matchedDelete || notMatchedDelete) && 
!mergeEngine.supportBatchDelete()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "merge-into is executed in batch mode, and you 
have set matched_delete or not_matched_by_source_delete."
+                                    + " But merge engine %s can not support 
batch delete. Support batch delete merge engines are: %s.",
+                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+        }
+
+        if ((matchedUpsert && matchedDelete)
+                && (matchedUpsertCondition == null || matchedDeleteCondition 
== null)) {
+            throw new IllegalArgumentException(
+                    "If both matched-upsert and matched-delete actions are 
present, their conditions must both be present too.");
+        }
+
+        if ((notMatchedUpsert && notMatchedDelete)
+                && (notMatchedBySourceUpsertCondition == null
+                        || notMatchedBySourceDeleteCondition == null)) {
+            throw new IllegalArgumentException(
+                    "If both not-matched-by-source-upsert and 
not-matched-by--source-delete actions are present, "
+                            + "their conditions must both be present too.\n");
+        }
+
+        if (notMatchedBySourceUpsertSet != null && 
notMatchedBySourceUpsertSet.equals("*")) {
+            throw new IllegalArgumentException(
+                    "The '*' cannot be used in 
not_matched_by_source_upsert_set");
+        }
+    }
+
     @Override
     public void run() throws Exception {
         DataStream<RowData> dataStream = buildDataStream();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
index 3c07e3bd5..0ff0db327 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
@@ -112,7 +112,7 @@ public class MergeIntoActionFactory implements 
ActionFactory {
                     params.get(NOT_MATCHED_INSERT_VALUES));
         }
 
-        validate(action);
+        action.validate();
 
         return Optional.of(action);
     }
@@ -202,36 +202,4 @@ public class MergeIntoActionFactory implements 
ActionFactory {
         System.out.println(
                 "  It will find matched rows of target table that meet 
condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
     }
-
-    public static void validate(MergeIntoAction action) {
-        if (!action.matchedUpsert
-                && !action.notMatchedUpsert
-                && !action.matchedDelete
-                && !action.notMatchedDelete
-                && !action.insert) {
-            throw new IllegalArgumentException(
-                    "Must specify at least one merge action. Run 'merge_into 
--help' for help.");
-        }
-
-        if ((action.matchedUpsert && action.matchedDelete)
-                && (action.matchedUpsertCondition == null
-                        || action.matchedDeleteCondition == null)) {
-            throw new IllegalArgumentException(
-                    "If both matched-upsert and matched-delete actions are 
present, their conditions must both be present too.");
-        }
-
-        if ((action.notMatchedUpsert && action.notMatchedDelete)
-                && (action.notMatchedBySourceUpsertCondition == null
-                        || action.notMatchedBySourceDeleteCondition == null)) {
-            throw new IllegalArgumentException(
-                    "If both not-matched-by-source-upsert and 
not-matched-by--source-delete actions are present, "
-                            + "their conditions must both be present too.\n");
-        }
-
-        if (action.notMatchedBySourceUpsertSet != null
-                && action.notMatchedBySourceUpsertSet.equals("*")) {
-            throw new IllegalArgumentException(
-                    "The '*' cannot be used in 
not_matched_by_source_upsert_set");
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index 450646803..c4236ac7d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.MergeIntoAction;
-import org.apache.paimon.flink.action.MergeIntoActionFactory;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -222,7 +221,7 @@ public class MergeIntoProcedure extends ProcedureBase {
         }
 
         
action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
-        MergeIntoActionFactory.validate(action);
+        action.validate();
 
         DataStream<RowData> dataStream = action.buildDataStream();
         TableResult tableResult = action.batchSink(dataStream);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 0d2bd3962..5d8ed8c74 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
@@ -89,47 +90,45 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
         // Since only UPDATE_AFTER type messages can be received at present,
         // AppendOnlyFileStoreTable cannot correctly handle old data, so they 
are marked as
         // unsupported. Similarly, it is not allowed to update the primary key 
column when updating
-        // the column of PrimaryKeyFileStoreTable, because the old data cannot 
be handled
-        // correctly.
-        if (table.primaryKeys().size() > 0) {
-            Options options = Options.fromMap(table.options());
-            Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
-            updatedColumns.forEach(
-                    column -> {
-                        if (primaryKeys.contains(column.getName())) {
-                            String errMsg =
-                                    String.format(
-                                            "Updates to primary keys are not 
supported, primaryKeys (%s), updatedColumns (%s)",
-                                            primaryKeys,
-                                            updatedColumns.stream()
-                                                    .map(Column::getName)
-                                                    
.collect(Collectors.toList()));
-                            throw new UnsupportedOperationException(errMsg);
-                        }
-                    });
-            if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
-                    || options.get(MERGE_ENGINE) == 
MergeEngine.PARTIAL_UPDATE) {
-                // Even with partial-update we still need all columns. Because 
the topology
-                // structure is source -> cal -> constraintEnforcer -> sink, 
in the
-                // constraintEnforcer operator, the constraint check will be 
performed according to
-                // the index, not according to the column. So we can't return 
only some columns,
-                // which will cause problems like 
ArrayIndexOutOfBoundsException.
-                // TODO: return partial columns after FLINK-32001 is resolved.
-                return new RowLevelUpdateInfo() {};
-            }
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "%s can not support update, currently only %s of 
%s and %s can support update.",
-                            options.get(MERGE_ENGINE),
-                            MERGE_ENGINE.key(),
-                            MergeEngine.DEDUPLICATE,
-                            MergeEngine.PARTIAL_UPDATE));
-        } else {
+        // the column of PrimaryKeyFileStoreTable, because the old data cannot 
be handled correctly.
+        if (table.primaryKeys().isEmpty()) {
             throw new UnsupportedOperationException(
                     String.format(
                             "%s can not support update, because there is no 
primary key.",
                             table.getClass().getName()));
         }
+
+        Options options = Options.fromMap(table.options());
+        Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
+        updatedColumns.forEach(
+                column -> {
+                    if (primaryKeys.contains(column.getName())) {
+                        String errMsg =
+                                String.format(
+                                        "Updates to primary keys are not 
supported, primaryKeys (%s), updatedColumns (%s)",
+                                        primaryKeys,
+                                        updatedColumns.stream()
+                                                .map(Column::getName)
+                                                .collect(Collectors.toList()));
+                        throw new UnsupportedOperationException(errMsg);
+                    }
+                });
+
+        MergeEngine mergeEngine = options.get(MERGE_ENGINE);
+        if (!mergeEngine.supportBatchUpdate()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Merge engine %s can not support batch update. 
Support batch update merge engines are: %s.",
+                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
+        }
+
+        // Even with partial-update we still need all columns. Because the 
topology
+        // structure is source -> cal -> constraintEnforcer -> sink, in the
+        // constraintEnforcer operator, the constraint check will be performed 
according to
+        // the index, not according to the column. So we can't return only 
some columns,
+        // which will cause problems like ArrayIndexOutOfBoundsException.
+        // TODO: return partial columns after FLINK-32001 is resolved.
+        return new RowLevelUpdateInfo() {};
     }
 
     @Override
@@ -177,21 +176,20 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
     }
 
     private void validateDeletable() {
-        if (table.primaryKeys().size() > 0) {
-            Options options = Options.fromMap(table.options());
-            if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
-                return;
-            }
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "merge engine '%s' can not support delete, 
currently only %s can support delete.",
-                            options.get(MERGE_ENGINE), 
MergeEngine.DEDUPLICATE));
-        } else {
+        if (table.primaryKeys().isEmpty()) {
             throw new UnsupportedOperationException(
                     String.format(
                             "table '%s' can not support delete, because there 
is no primary key.",
                             table.getClass().getName()));
         }
+
+        MergeEngine mergeEngine = 
CoreOptions.fromMap(table.options()).mergeEngine();
+        if (!mergeEngine.supportBatchDelete()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Merge engine %s can not support batch delete. 
Support batch delete merge engines are: %s.",
+                            mergeEngine, 
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+        }
     }
 
     private boolean canPushDownDeleteFilter() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index df8e317d4..c2235c85b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1332,9 +1332,6 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     @ParameterizedTest
     @EnumSource(CoreOptions.MergeEngine.class)
     public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
-        Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
-        supportUpdateEngines.add(DEDUPLICATE);
-        supportUpdateEngines.add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
         // Step1: define table schema
         Map<String, String> options = new HashMap<>();
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
@@ -1361,12 +1358,17 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 "(3, 'Euro', 119, '2022-01-02')");
 
         // Step3: prepare expected data.
-        String rowKind = mergeEngine == CoreOptions.MergeEngine.PARTIAL_UPDATE 
? "+I" : "+U";
+        String rowKind = mergeEngine == FIRST_ROW ? "+I" : "+U";
         List<Row> expectedRecords =
                 Arrays.asList(
                         // part = 2022-01-01
                         changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
-                        changelogRow(rowKind, 2L, "Yen", 1L, "2022-01-01"),
+                        changelogRow(
+                                rowKind,
+                                2L,
+                                mergeEngine == FIRST_ROW ? "UNKNOWN" : "Yen",
+                                mergeEngine == FIRST_ROW ? -1 : 1L,
+                                "2022-01-01"),
                         changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
                         // part = 2022-01-02
                         changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
@@ -1374,15 +1376,14 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         // Step4: prepare update statement
         String updateStatement =
                 String.format(
-                        ""
-                                + "UPDATE %s "
+                        "UPDATE %s "
                                 + "SET currency = 'Yen', "
                                 + "rate = 1 "
                                 + "WHERE currency = 'UNKNOWN' and dt = 
'2022-01-01'",
                         table);
 
         // Step5: execute update statement and verify result
-        if (supportUpdateEngines.contains(mergeEngine)) {
+        if (mergeEngine.supportBatchUpdate()) {
             bEnv.executeSql(updateStatement).await();
             String querySql = String.format("SELECT * FROM %s", table);
             testBatchRead(querySql, expectedRecords);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 1926299ba..523033579 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -132,59 +132,6 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+I", 12, "v_12", "insert", "02-29")));
     }
 
-    @Test
-    public void testWorkWithPartialUpdate() throws Exception {
-        // re-create target table with given producer
-        sEnv.executeSql("DROP TABLE T");
-        prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);
-
-        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, 
database, "T");
-        action.withSourceTable("S")
-                .withMergeCondition("T.k = S.k AND T.dt = S.dt")
-                .withMatchedUpsert(
-                        "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, 
last_action = 'matched_upsert'")
-                .withMatchedDelete("S.v IS NULL")
-                .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
-                .withNotMatchedBySourceUpsert(
-                        "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
-                .withNotMatchedBySourceDelete("dt >= '02-28'");
-
-        // delete records are filtered
-        validateActionRunResult(
-                action.build(),
-                Arrays.asList(
-                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
-                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
-                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
-                        changelogRow("+I", 12, "v_12", "insert", "02-29")),
-                Arrays.asList(
-                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
-                        changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+I", 4, "v_4", "creation", "02-27"),
-                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
-                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
-                        changelogRow("+I", 7, "Seven", "matched_upsert", 
"02-28"),
-                        changelogRow("+I", 8, "v_8", "creation", "02-28"),
-                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
-                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
-                        changelogRow("+I", 10, "v_10", "creation", "02-28"),
-                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
-                        changelogRow("+I", 12, "v_12", "insert", "02-29")));
-
-        // test partial update still works after action
-        insertInto(
-                "T",
-                "(12, CAST (NULL AS STRING), '$', '02-29')",
-                "(12, 'Test', CAST (NULL AS STRING), '02-29')");
-
-        testBatchRead(
-                "SELECT * FROM T WHERE k = 12",
-                Collections.singletonList(changelogRow("+I", 12, "Test", "$", 
"02-29")));
-    }
-
     @ParameterizedTest(name = "in-default = {0}")
     @ValueSource(booleans = {true, false})
     public void testTargetAlias(boolean inDefault) throws Exception {

Reply via email to