This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 94801dd3d [flink] Refactor MergeIntoProcedure to make the api easier 
to use (#2145)
94801dd3d is described below

commit 94801dd3df937d94a9aa032fffc1e070a6971427
Author: yuzelin <[email protected]>
AuthorDate: Wed Oct 18 12:02:42 2023 +0800

    [flink] Refactor MergeIntoProcedure to make the api easier to use (#2145)
---
 .../paimon/flink/utils/TableEnvironmentUtils.java  |  10 +-
 .../apache/paimon/flink/action/DeleteAction.java   |   2 +-
 .../paimon/flink/action/MergeIntoAction.java       |   8 +-
 .../paimon/flink/action/TableActionBase.java       |   6 +-
 .../paimon/flink/procedure/MergeIntoProcedure.java | 245 ++++++++++++---------
 .../paimon/flink/procedure/ProcedureBase.java      |  12 +-
 .../paimon/flink/utils/TableEnvironmentUtils.java  |  10 +-
 .../paimon/flink/action/MergeIntoActionITCase.java | 181 ++++++++-------
 8 files changed, 274 insertions(+), 200 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
 
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
index b4fbd56e5..d1f4c09b4 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /** Utility methods for {@link TableEnvironment} and its subclasses. */
 public class TableEnvironmentUtils {
@@ -35,7 +34,7 @@ public class TableEnvironmentUtils {
      * Invoke {@code 
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
      * from a {@link StreamTableEnvironment} instance through reflecting.
      */
-    public static void executeInternal(
+    public static TableResult executeInternal(
             StreamTableEnvironment tEnv,
             List<Transformation<?>> transformations,
             List<String> sinkIdentifierNames) {
@@ -45,10 +44,7 @@ public class TableEnvironmentUtils {
                     clazz.getDeclaredMethod("executeInternal", List.class, 
List.class);
             executeInternal.setAccessible(true);
 
-            TableResult tableResult =
-                    (TableResult)
-                            executeInternal.invoke(tEnv, transformations, 
sinkIdentifierNames);
-            tableResult.await();
+            return (TableResult) executeInternal.invoke(tEnv, transformations, 
sinkIdentifierNames);
         } catch (NoSuchMethodException e) {
             throw new RuntimeException(
                     "Failed to get 'TableEnvironmentImpl#executeInternal(List, 
List)' method "
@@ -59,8 +55,6 @@ public class TableEnvironmentUtils {
                     "Failed to invoke 
'TableEnvironmentImpl#executeInternal(List, List)' method "
                             + "from given StreamTableEnvironment instance by 
Java reflection. This is unexpected.",
                     e);
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException("Failed to wait for insert job to 
finish.", e);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 4de8e80ac..19f77fe91 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -83,6 +83,6 @@ public class DeleteAction extends TableActionBase {
                                     return rowData;
                                 });
 
-        batchSink(dataStream);
+        batchSink(dataStream).await();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 60b8a710b..3fed1f56f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -219,6 +219,11 @@ public class MergeIntoAction extends TableActionBase {
 
     @Override
     public void run() throws Exception {
+        DataStream<RowData> dataStream = buildDataStream();
+        batchSink(dataStream).await();
+    }
+
+    public DataStream<RowData> buildDataStream() {
         // handle aliases
         handleTargetAlias();
 
@@ -237,9 +242,8 @@ public class MergeIntoAction extends TableActionBase {
                         .map(Optional::get)
                         .collect(Collectors.toList());
 
-        // sink to target table
         DataStream<RowData> firstDs = dataStreams.get(0);
-        
batchSink(firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new)));
+        return 
firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new));
     }
 
     private void handleTargetAlias() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 5095cc352..06d234469 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.data.RowData;
 
 import java.util.Collections;
@@ -57,7 +58,7 @@ public abstract class TableActionBase extends ActionBase {
     }
 
     /** Sink {@link DataStream} dataStream to table with Flink Table API in 
batch environment. */
-    protected void batchSink(DataStream<RowData> dataStream) {
+    public TableResult batchSink(DataStream<RowData> dataStream) {
         List<Transformation<?>> transformations =
                 Collections.singletonList(
                         new FlinkSinkBuilder((FileStoreTable) table)
@@ -67,7 +68,8 @@ public abstract class TableActionBase extends ActionBase {
 
         List<String> sinkIdentifierNames = 
Collections.singletonList(identifier.getFullName());
 
-        TableEnvironmentUtils.executeInternal(batchTEnv, transformations, 
sinkIdentifierNames);
+        return TableEnvironmentUtils.executeInternal(
+                batchTEnv, transformations, sinkIdentifierNames);
     }
 
     /**
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index e2c14e0c3..e6658346b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -23,18 +23,14 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.MergeIntoAction;
 import org.apache.paimon.flink.action.MergeIntoActionFactory;
 
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
@@ -42,29 +38,62 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  * Merge Into procedure. Usage:
  *
  * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  -- when matched then upsert
  *  CALL sys.merge_into(
  *      'targetTableId',
  *      'targetAlias',
  *      'sourceSqls', -- separate with ';'
  *      'sourceTable',
  *      'mergeCondition',
- *      'mergeActions',
- *      // arguments for merge actions
- *      ...
- *      )
+ *      'matchedUpsertCondition',
+ *      'matchedUpsertSetting'
+ *  )
  *
- *  -- merge actions and corresponding arguments
- *  matched-upsert: condition, set
- *  not-matched-by-source-upsert: condition, set
- *  matched-delete: condition
- *  not-matched-by-source-delete: condition
- *  not-matched-insert: condition, insertValues
+ *  -- when matched then upsert + when not matched then insert
+ *  CALL sys.merge_into(
+ *      'targetTableId'
+ *      'targetAlias',
+ *      'sourceSqls',
+ *      'sourceTable',
+ *      'mergeCondition',
+ *      'matchedUpsertCondition',
+ *      'matchedUpsertSetting',
+ *      'notMatchedInsertCondition',
+ *      'notMatchedInsertValues'
+ *  )
  *
- *  -- NOTE: the arguments should be in the order of merge actions
- *  -- and use '' as placeholder for optional arguments
+ *  -- above + when matched then delete
+ *  -- IMPORTANT: Use 'TRUE' if you want to delete data without filter 
condition.
+ *  -- If matchedDeleteCondition='', it will ignore matched-delete action!
+ *  CALL sys.merge_into(
+ *      'targetTableId',
+ *      'targetAlias',
+ *      'sourceSqls',
+ *      'sourceTable',
+ *      'mergeCondition',
+ *      'matchedUpsertCondition',
+ *      'matchedUpsertSetting',
+ *      'notMatchedInsertCondition',
+ *      'notMatchedInsertValues',
+ *      'matchedDeleteCondition'
+ *  )
+ *
+ *  -- when matched then delete (short form)
+ *  CALL sys.merge_into(
+ *      'targetTableId'
+ *      'targetAlias',
+ *      'sourceSqls',
+ *      'sourceTable',
+ *      'mergeCondition',
+ *      'matchedDeleteCondition'
+ *  )
  * </code></pre>
  *
- * <p>This procedure will be forced to use batch environments
+ * <p>This procedure will be forced to use batch environments. Compared to 
{@link MergeIntoAction},
+ * this procedure doesn't provide arguments to control not-matched-by-source 
behavior because they
+ * are not commonly used and will make the methods too complex to use.
  */
 public class MergeIntoProcedure extends ProcedureBase {
 
@@ -77,9 +106,81 @@ public class MergeIntoProcedure extends ProcedureBase {
             String sourceSqls,
             String sourceTable,
             String mergeCondition,
-            String mergeActions,
-            String... mergeActionArguments)
-            throws Exception {
+            String matchedUpsertCondition,
+            String matchedUpsertSetting) {
+        return call(
+                procedureContext,
+                targetTableId,
+                targetAlias,
+                sourceSqls,
+                sourceTable,
+                mergeCondition,
+                matchedUpsertCondition,
+                matchedUpsertSetting,
+                "",
+                "",
+                "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String targetTableId,
+            String targetAlias,
+            String sourceSqls,
+            String sourceTable,
+            String mergeCondition,
+            String matchedUpsertCondition,
+            String matchedUpsertSetting,
+            String notMatchedInsertCondition,
+            String notMatchedInsertValues) {
+        return call(
+                procedureContext,
+                targetTableId,
+                targetAlias,
+                sourceSqls,
+                sourceTable,
+                mergeCondition,
+                matchedUpsertCondition,
+                matchedUpsertSetting,
+                notMatchedInsertCondition,
+                notMatchedInsertValues,
+                "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String targetTableId,
+            String targetAlias,
+            String sourceSqls,
+            String sourceTable,
+            String mergeCondition,
+            String matchedDeleteCondition) {
+        return call(
+                procedureContext,
+                targetTableId,
+                targetAlias,
+                sourceSqls,
+                sourceTable,
+                mergeCondition,
+                "",
+                "",
+                "",
+                "",
+                matchedDeleteCondition);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String targetTableId,
+            String targetAlias,
+            String sourceSqls,
+            String sourceTable,
+            String mergeCondition,
+            String matchedUpsertCondition,
+            String matchedUpsertSetting,
+            String notMatchedInsertCondition,
+            String notMatchedInsertValues,
+            String matchedDeleteCondition) {
         String warehouse = ((AbstractCatalog) catalog).warehouse();
         Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
         Identifier identifier = Identifier.fromString(targetTableId);
@@ -101,88 +202,32 @@ public class MergeIntoProcedure extends ProcedureBase {
         checkArgument(!mergeCondition.isEmpty(), "Must specify merge 
condition.");
         action.withMergeCondition(mergeCondition);
 
-        checkArgument(!mergeActions.isEmpty(), "Must specify at least one 
merge action.");
-        List<String> actions =
-                Arrays.stream(mergeActions.split(","))
-                        .map(String::trim)
-                        .collect(Collectors.toList());
-        validateActions(actions, mergeActionArguments.length);
-
-        int index = 0;
-        String condition, setting;
-        for (String mergeAction : actions) {
-            switch (mergeAction) {
-                case MATCHED_UPSERT:
-                case NOT_MATCHED_BY_SOURCE_UPSERT:
-                case NOT_MATCHED_INSERT:
-                    condition = nullable(mergeActionArguments[index++]);
-                    setting = nullable(mergeActionArguments[index++]);
-                    checkNotNull(setting, "%s must set the second argument", 
mergeAction);
-                    setMergeAction(action, mergeAction, condition, setting);
-                    break;
-                case MATCHED_DELETE:
-                case NOT_MATCHED_BY_SOURCE_DELETE:
-                    condition = nullable(mergeActionArguments[index++]);
-                    setMergeAction(action, mergeAction, condition);
-                    break;
-                default:
-                    throw new UnsupportedOperationException("Unknown merge 
action: " + action);
-            }
+        if (!matchedUpsertCondition.isEmpty() || 
!matchedUpsertSetting.isEmpty()) {
+            String condition = nullable(matchedUpsertCondition);
+            String setting = nullable(matchedUpsertSetting);
+            checkNotNull(setting, "matched-upsert must set the 
'matchedUpsertSetting' argument");
+            action.withMatchedUpsert(condition, setting);
         }
 
-        MergeIntoActionFactory.validate(action);
-
-        // TODO set dml-sync argument to action
-        action.run();
-
-        return new String[] {"Success"};
-    }
+        if (!notMatchedInsertCondition.isEmpty() || 
!notMatchedInsertValues.isEmpty()) {
+            String condition = nullable(notMatchedInsertCondition);
+            String values = nullable(notMatchedInsertValues);
+            checkNotNull(
+                    values, "not-matched-insert must set the 
'notMatchedInsertValues' argument");
+            action.withNotMatchedInsert(condition, values);
+        }
 
-    private void validateActions(List<String> mergeActions, int 
argumentLength) {
-        int expectedArguments = 0;
-        for (String action : mergeActions) {
-            switch (action) {
-                case MATCHED_UPSERT:
-                case NOT_MATCHED_BY_SOURCE_UPSERT:
-                case NOT_MATCHED_INSERT:
-                    expectedArguments += 2;
-                    break;
-                case MATCHED_DELETE:
-                case NOT_MATCHED_BY_SOURCE_DELETE:
-                    expectedArguments += 1;
-                    break;
-                default:
-                    throw new UnsupportedOperationException("Unknown merge 
action: " + action);
-            }
+        if (!matchedDeleteCondition.isEmpty()) {
+            action.withMatchedDelete(matchedDeleteCondition);
         }
 
-        checkArgument(
-                expectedArguments == argumentLength,
-                "Expected %s action arguments but given '%s'",
-                expectedArguments,
-                argumentLength);
-    }
+        MergeIntoActionFactory.validate(action);
 
-    private void setMergeAction(MergeIntoAction action, String mergeAction, 
String... arguments) {
-        switch (mergeAction) {
-            case MATCHED_UPSERT:
-                action.withMatchedUpsert(arguments[0], arguments[1]);
-                return;
-            case NOT_MATCHED_BY_SOURCE_UPSERT:
-                action.withNotMatchedBySourceUpsert(arguments[0], 
arguments[1]);
-                return;
-            case NOT_MATCHED_INSERT:
-                action.withNotMatchedInsert(arguments[0], arguments[1]);
-                return;
-            case MATCHED_DELETE:
-                action.withMatchedDelete(arguments[0]);
-                return;
-            case NOT_MATCHED_BY_SOURCE_DELETE:
-                action.withNotMatchedBySourceDelete(arguments[0]);
-                return;
-            default:
-                throw new UnsupportedOperationException("Unknown merge action: 
" + mergeAction);
-        }
+        DataStream<RowData> dataStream = action.buildDataStream();
+        TableResult tableResult = action.batchSink(dataStream);
+        JobClient jobClient = tableResult.getJobClient().get();
+
+        return execute(procedureContext, jobClient);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 82f869f28..91b8a1aa1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -73,8 +73,18 @@ public abstract class ProcedureBase implements Procedure, 
Factory {
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         String name = 
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
         JobClient jobClient = env.executeAsync(name);
+        return execute(jobClient, conf.get(TABLE_DML_SYNC));
+    }
+
+    protected String[] execute(ProcedureContext procedureContext, JobClient 
jobClient) {
+        StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        return execute(jobClient, conf.get(TABLE_DML_SYNC));
+    }
+
+    private String[] execute(JobClient jobClient, boolean dmlSync) {
         String jobId = jobClient.getJobID().toString();
-        if (conf.get(TABLE_DML_SYNC)) {
+        if (dmlSync) {
             try {
                 jobClient.getJobExecutionResult().get();
             } catch (Exception e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
index 9b7009387..6a5820ce8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /** Utility methods for {@link TableEnvironment} and its subclasses. */
 public class TableEnvironmentUtils {
@@ -35,7 +34,7 @@ public class TableEnvironmentUtils {
      * Invoke {@code 
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
      * from a {@link StreamTableEnvironment} instance through reflecting.
      */
-    public static void executeInternal(
+    public static TableResult executeInternal(
             StreamTableEnvironment tEnv,
             List<Transformation<?>> transformations,
             List<String> sinkIdentifierNames) {
@@ -45,10 +44,7 @@ public class TableEnvironmentUtils {
                     clazz.getDeclaredMethod("executeInternal", List.class, 
List.class);
             executeInternal.setAccessible(true);
 
-            TableResult tableResult =
-                    (TableResult)
-                            executeInternal.invoke(tEnv, transformations, 
sinkIdentifierNames);
-            tableResult.await();
+            return (TableResult) executeInternal.invoke(tEnv, transformations, 
sinkIdentifierNames);
         } catch (NoSuchMethodException e) {
             throw new RuntimeException(
                     "Failed to get 'TableEnvironmentImpl#executeInternal(List, 
List)' method "
@@ -59,8 +55,6 @@ public class TableEnvironmentUtils {
                     "Failed to invoke 
'TableEnvironmentImpl#executeInternal(List, List)' method "
                             + "from given StreamTableEnvironment instance by 
Java reflection. This is unexpected.",
                     e);
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException("Failed to wait for insert job to 
finish.", e);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 53cc1f6b6..7d6ce1f33 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -40,11 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
-import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
@@ -116,31 +111,8 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
                 .withNotMatchedBySourceDelete("dt >= '02-28'");
 
-        String mergeActions =
-                String.format(
-                        "%s,%s,%s,%s,%s",
-                        MATCHED_UPSERT,
-                        MATCHED_DELETE,
-                        NOT_MATCHED_INSERT,
-                        NOT_MATCHED_BY_SOURCE_UPSERT,
-                        NOT_MATCHED_BY_SOURCE_DELETE);
-        String procedureStatement =
-                String.format(
-                        "CALL sys.merge_into('%s.T', '', '', 'default.S', 'T.k 
= S.k AND T.dt = S.dt', '%s', %s)",
-                        database,
-                        mergeActions,
-                        "'T.v <> S.v AND S.v IS NOT NULL', "
-                                + "'v = S.v, last_action = 
''matched_upsert''', "
-                                + "'S.v IS NULL', "
-                                + "'', "
-                                + "'S.k, S.v, ''insert'', S.dt', "
-                                + "'dt < ''02-28''', "
-                                + "'v = v || ''_nmu'', last_action = 
''not_matched_upsert''', "
-                                + "'dt >= ''02-28'''");
-
         validateActionRunResult(
                 action,
-                procedureStatement,
                 expected,
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
@@ -200,15 +172,15 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
 
         String procedureStatement =
                 String.format(
-                        "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k = 
S.k AND TT.dt = S.dt', '%s', %s)",
-                        inDefault ? database : "test_db", MATCHED_DELETE, 
"'S.v IS NULL'");
+                        "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k = 
S.k AND TT.dt = S.dt', 'S.v IS NULL')",
+                        inDefault ? database : "test_db");
 
-        validateActionRunResult(
-                action,
-                procedureStatement,
+        List<Row> streamingExpected =
                 Arrays.asList(
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
-                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28"));
+
+        List<Row> batchExpected =
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
                         changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -217,7 +189,13 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+I", 6, "v_6", "creation", "02-28"),
                         changelogRow("+I", 7, "v_7", "creation", "02-28"),
                         changelogRow("+I", 9, "v_9", "creation", "02-28"),
-                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+                        changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            validateActionRunResult(action, streamingExpected, batchExpected);
+        } else {
+            validateProcedureResult(procedureStatement, streamingExpected, 
batchExpected);
+        }
     }
 
     @ParameterizedTest(name = "in-default = {0}")
@@ -242,20 +220,20 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
 
         String procedureStatement =
                 String.format(
-                        "CALL sys.merge_into('default.T', '', '', '%s', 'T.k = 
S.k AND T.dt = S.dt', '%s', %s)",
-                        sourceTableName, MATCHED_DELETE, "'S.v IS NULL'");
+                        "CALL sys.merge_into('default.T', '', '', '%s', 'T.k = 
S.k AND T.dt = S.dt', 'S.v IS NULL')",
+                        sourceTableName);
 
         if (!inDefault) {
             sEnv.executeSql("USE `default`");
             bEnv.executeSql("USE `default`");
         }
 
-        validateActionRunResult(
-                action,
-                procedureStatement,
+        List<Row> streamingExpected =
                 Arrays.asList(
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
-                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28"));
+
+        List<Row> batchExpected =
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
                         changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -264,7 +242,13 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+I", 6, "v_6", "creation", "02-28"),
                         changelogRow("+I", 7, "v_7", "creation", "02-28"),
                         changelogRow("+I", 9, "v_9", "creation", "02-28"),
-                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+                        changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            validateActionRunResult(action, streamingExpected, batchExpected);
+        } else {
+            validateProcedureResult(procedureStatement, streamingExpected, 
batchExpected);
+        }
     }
 
     @ParameterizedTest(name = "useCatalog = {0}")
@@ -306,23 +290,21 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
 
         String procedureStatement =
                 String.format(
-                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
S.k AND T.dt = S.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
S.k AND T.dt = S.dt', 'S.v IS NULL')",
                         database,
                         useCatalog
                                 ? String.format(
                                         "%s;%s;%s",
                                         escapeCatalog, "USE CATALOG test_cat", 
escapeDdl)
                                 : String.format("%s;%s", escapeCatalog, 
escapeDdl),
-                        useCatalog ? "S" : "test_cat.default.S",
-                        MATCHED_DELETE,
-                        "'S.v IS NULL'");
+                        useCatalog ? "S" : "test_cat.default.S");
 
-        validateActionRunResult(
-                action,
-                procedureStatement,
+        List<Row> streamingExpected =
                 Arrays.asList(
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
-                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28"));
+
+        List<Row> batchExpected =
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
                         changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -331,7 +313,13 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+I", 6, "v_6", "creation", "02-28"),
                         changelogRow("+I", 7, "v_7", "creation", "02-28"),
                         changelogRow("+I", 9, "v_9", "creation", "02-28"),
-                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+                        changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            validateActionRunResult(action, streamingExpected, batchExpected);
+        } else {
+            validateProcedureResult(procedureStatement, streamingExpected, 
batchExpected);
+        }
     }
 
     @ParameterizedTest(name = "source-qualified = {0}")
@@ -346,16 +334,12 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
 
         String procedureStatement =
                 String.format(
-                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
SS.k AND T.dt = SS.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
SS.k AND T.dt = SS.dt', '', '*')",
                         database,
                         "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', 
dt FROM S",
-                        qualified ? "default.SS" : "SS",
-                        MATCHED_UPSERT,
-                        "'', '*'");
+                        qualified ? "default.SS" : "SS");
 
-        validateActionRunResult(
-                action,
-                procedureStatement,
+        List<Row> streamingExpected =
                 Arrays.asList(
                         changelogRow("-U", 1, "v_1", "creation", "02-27"),
                         changelogRow("+U", 1, "v_1", "unknown", "02-27"),
@@ -364,7 +348,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
                         changelogRow("-U", 7, "v_7", "creation", "02-28"),
                         changelogRow("+U", 7, "Seven", "unknown", "02-28"),
                         changelogRow("-U", 8, "v_8", "creation", "02-28"),
-                        changelogRow("+U", 8, null, "unknown", "02-28")),
+                        changelogRow("+U", 8, null, "unknown", "02-28"));
+
+        List<Row> batchExpected =
                 Arrays.asList(
                         changelogRow("+U", 1, "v_1", "unknown", "02-27"),
                         changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -375,7 +361,13 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+U", 7, "Seven", "unknown", "02-28"),
                         changelogRow("+U", 8, null, "unknown", "02-28"),
                         changelogRow("+I", 9, "v_9", "creation", "02-28"),
-                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+                        changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            validateActionRunResult(action, streamingExpected, batchExpected);
+        } else {
+            validateProcedureResult(procedureStatement, streamingExpected, 
batchExpected);
+        }
     }
 
     @ParameterizedTest(name = "source-qualified = {0}")
@@ -390,19 +382,17 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
 
         String procedureStatement =
                 String.format(
-                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
SS.k AND T.dt = SS.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
SS.k AND T.dt = SS.dt', '', '', 'SS.k < 12', '*', '')",
                         database,
                         "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', 
dt FROM S",
-                        qualified ? "default.SS" : "SS",
-                        NOT_MATCHED_INSERT,
-                        "'SS.k < 12', '*'");
+                        qualified ? "default.SS" : "SS");
 
-        validateActionRunResult(
-                action,
-                procedureStatement,
+        List<Row> streamingExpected =
                 Arrays.asList(
                         changelogRow("+I", 8, "v_8", "unknown", "02-29"),
-                        changelogRow("+I", 11, "v_11", "unknown", "02-29")),
+                        changelogRow("+I", 11, "v_11", "unknown", "02-29"));
+
+        List<Row> batchExpected =
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
                         changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -415,7 +405,36 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+I", 9, "v_9", "creation", "02-28"),
                         changelogRow("+I", 10, "v_10", "creation", "02-28"),
                         changelogRow("+I", 8, "v_8", "unknown", "02-29"),
-                        changelogRow("+I", 11, "v_11", "unknown", "02-29")));
+                        changelogRow("+I", 11, "v_11", "unknown", "02-29"));
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            validateActionRunResult(action, streamingExpected, batchExpected);
+        } else {
+            validateProcedureResult(procedureStatement, streamingExpected, 
batchExpected);
+        }
+    }
+
+    @Test
+    public void testProcedureWithDeleteConditionTrue() throws Exception {
+        String procedureStatement =
+                String.format(
+                        "CALL sys.merge_into('%s.T', '', '', 'S', 'T.k = S.k 
AND T.dt = S.dt', 'TRUE')",
+                        database);
+
+        validateProcedureResult(
+                procedureStatement,
+                Arrays.asList(
+                        changelogRow("-D", 1, "v_1", "creation", "02-27"),
+                        changelogRow("-D", 4, "v_4", "creation", "02-27"),
+                        changelogRow("-D", 7, "v_7", "creation", "02-28"),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
+                Arrays.asList(
+                        changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
     }
 
     // 
----------------------------------------------------------------------------------------------------------------
@@ -493,18 +512,11 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
     }
 
     private void validateActionRunResult(
-            MergeIntoAction action,
-            String procedureStatement,
-            List<Row> streamingExpected,
-            List<Row> batchExpected)
+            MergeIntoAction action, List<Row> streamingExpected, List<Row> 
batchExpected)
             throws Exception {
         BlockingIterator<Row, Row> iterator =
                 testStreamingRead(buildSimpleQuery("T"), initialRecords);
-        if (ThreadLocalRandom.current().nextBoolean()) {
-            action.run();
-        } else {
-            callProcedure(procedureStatement);
-        }
+        action.run();
         // test streaming read
         validateStreamingReadResult(iterator, streamingExpected);
         iterator.close();
@@ -512,6 +524,19 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
         testBatchRead(buildSimpleQuery("T"), batchExpected);
     }
 
+    private void validateProcedureResult(
+            String procedureStatement, List<Row> streamingExpected, List<Row> 
batchExpected)
+            throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                testStreamingRead(buildSimpleQuery("T"), initialRecords);
+        callProcedure(procedureStatement, true, true);
+        // test batch read first to ensure TABLE_DML_SYNC works
+        testBatchRead(buildSimpleQuery("T"), batchExpected);
+        // test streaming read
+        validateStreamingReadResult(iterator, streamingExpected);
+        iterator.close();
+    }
+
     private void prepareTargetTable(CoreOptions.ChangelogProducer producer) 
throws Exception {
         sEnv.executeSql(
                 buildDdl(


Reply via email to