This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe06f73 [FLINK-31415] Support target alias and using ddls to create 
source
3fe06f73 is described below

commit 3fe06f7384bb8ce5fab5997b5d0e81761874f092
Author: yuzelin <[email protected]>
AuthorDate: Tue Mar 14 09:30:41 2023 +0800

    [FLINK-31415] Support target alias and using ddls to create source
    
    This closes #596
---
 docs/content/docs/how-to/writing-tables.md         |  74 ++++++++++-
 .../table/store/tests/FlinkActionsE2eTest.java     |   4 +-
 .../table/store/connector/action/ActionBase.java   |   5 +-
 .../store/connector/action/MergeIntoAction.java    | 146 +++++++++++++--------
 .../connector/action/MergeIntoActionITCase.java    | 111 +++++++++-------
 5 files changed, 235 insertions(+), 105 deletions(-)

diff --git a/docs/content/docs/how-to/writing-tables.md 
b/docs/content/docs/how-to/writing-tables.md
index 2d3d0deb..2c3d693d 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -314,7 +314,9 @@ Run the following command to submit a 'merge-into' job for 
the table.
     --warehouse <warehouse-path> \
     --database <database-name> \
     --table <target-table> \
-    --using-table <source-table> \
+    [--target-as <target-table-alias>] \
+    --source-table <source-table> \
+    [--source-as <source-table-alias>] \
     --on <merge-condition> \
     --merge-actions 
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>
 \
     --matched-upsert-condition <matched-condition> \
@@ -326,6 +328,8 @@ Run the following command to submit a 'merge-into' job for 
the table.
     --not-matched-by-source-upsert-set <not-matched-upsert-changes> \
     --not-matched-by-source-delete-condition <not-matched-by-source-condition>
     
+Alternatively, you can use '--source-sql <sql> [, --source-sql <sql> ...]' to 
create a new table as source table at runtime.
+    
 -- Examples:
 -- Find all orders mentioned in the source table, then mark as important if 
the price is above 100 
 -- or delete if the price is under 10.
@@ -336,12 +340,12 @@ Run the following command to submit a 'merge-into' job 
for the table.
     --warehouse <warehouse-path> \
     --database <database-name> \
     --table T \
-    --using-table S \
+    --source-table S \
     --on "T.id = S.order_id" \
     --merge-actions \
     matched-upsert,matched-delete \
     --matched-upsert-condition "T.price > 100" \
-    --matched-upsert-set "id = T.id, price = T.price, mark = 'important'" \
+    --matched-upsert-set "mark = 'important'" \
     --matched-delete-condition "T.price < 10" 
     
 -- For matched order rows, increase the price, and if there is no match, 
insert the order from the 
@@ -353,11 +357,11 @@ Run the following command to submit a 'merge-into' job 
for the table.
     --warehouse <warehouse-path> \
     --database <database-name> \
     --table T \
-    --using-table S \
+    --source-table S \
     --on "T.id = S.order_id" \
     --merge-actions \
     matched-upsert,not-matched-insert \
-    --matched-upsert-set "id = T.id, price = T.price + 20, mark = T.mark" \
+    --matched-upsert-set "price = T.price + 20" \
     --not-matched-insert-values * 
 
 -- For not matched by source order rows (which are in the target table and 
does not match any row in the
@@ -369,15 +373,71 @@ Run the following command to submit a 'merge-into' job 
for the table.
     --warehouse <warehouse-path> \
     --database <database-name> \
     --table T \
-    --using-table S \
+    --source-table S \
     --on "T.id = S.order_id" \
     --merge-actions \
     not-matched-by-source-upsert,not-matched-by-source-delete \
     --not-matched-by-source-upsert-condition "T.mark <> 'trivial'" \
-    --not-matched-by-source-upsert-set "id = T.id, price = T.price - 20, mark 
= T.mark" \
+    --not-matched-by-source-upsert-set "price = T.price - 20" \
     --not-matched-by-source-delete-condition "T.mark = 'trivial'"
+    
+-- An source-sql example: 
+-- Create a temporary view S in new catalog and use it as source table
+./flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+    merge-into \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table T \
+    --source-sql "CREATE CATALOG test WITH (...)" \
+    --source-sql "USE CATALOG test" \
+    --source-sql "USE DATABASE default" \
+    --source-sql "CREATE TEMPORARY VIEW S AS SELECT order_id, price, 
'important' FROM important_order" \
+    --source-as test.default.S \
+    --on "T.id = S.order_id" \
+    --merge-actions not-matched-insert\
+    --not-matched-insert-values *
 ```
 
+The term 'matched' explanation:
+1. matched: changed rows are from target table and each can match a source 
table row based on 
+merge-condition and optional matched-condition (source ∩ target).
+2. not-matched: changed rows are from source table and all rows cannot match 
any target table 
+row based on merge-condition and optional not-matched-condition (source - 
target).
+3. not-matched-by-source: changed rows are from target table and all row 
cannot match any source 
+table row based on merge-condition and optional 
not-matched-by-source-condition (target - source).
+
+Parameters format:\
+All conditions, set changes and values should use Flink SQL syntax. Please 
quote them
+with \" to escape special characters.
+1. matched-upsert-changes:\
+col = <source-table>.col | expression [, ...] (Means set target.col with given 
value. Do not 
+add '<target-table>.' before 'col'.)\
+Especially, you can use '*' to set columns with all source columns (require 
target table's 
+schema is equal to source's).
+2. not-matched-upsert-changes is similar to matched-upsert-changes, but you 
cannot reference 
+source table's column or use '*'.
+3. insert-values:\
+col1,col2,...,col_end\ 
+Must specify values of all columns. For each column, you can reference 
<source-table>.col or 
+use an expression.\
+Especially, you can use '*' to insert with all source columns (require target 
table's schema 
+is equal to source's).
+4. not-matched-condition cannot use target table's columns to construct 
condition expression.
+5. not-matched-by-source-condition cannot use source table's columns to 
construct condition expression.
+
+{{< hint warning >}}
+1. source-alias cannot be duplicated with existed table name. If you use 
--source-ddl, source-alias 
+must be specified and equal to the table name in "CREATE" statement.
+2. If the source table is not in the same place as target table, the 
source-table-name or the source-alias 
+should be qualified (database.table or catalog.database.table if in different 
catalog).
+3. At least one merge action must be specified.
+4. If both matched-upsert and matched-delete actions are present, their 
conditions must both be present too 
+(same to not-matched-by-source-upsert and not-matched-by-source-delete). 
Otherwise, all conditions are optional.
+
+{{< /hint >}}
+
 For more information of 'merge-into', see
 
 ```bash
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 7b1e2248..3dfb7c7d 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -222,6 +222,8 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                 jobManager.execInContainer(
                         "bin/flink",
                         "run",
+                        "-p",
+                        "1",
                         "-c",
                         
"org.apache.flink.table.store.connector.action.FlinkActions",
                         "--detached",
@@ -301,7 +303,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                         "default",
                         "--table",
                         "T",
-                        "--using-table",
+                        "--source-table",
                         "S",
                         "--on",
                         "T.k=S.k",
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
index d0750c09..0384e8a6 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE;
@@ -59,6 +60,8 @@ public abstract class ActionBase implements Action {
 
     protected final FlinkCatalog flinkCatalog;
 
+    protected final String catalogName = "table-store-" + UUID.randomUUID();
+
     protected StreamExecutionEnvironment env;
 
     protected StreamTableEnvironment tEnv;
@@ -77,7 +80,7 @@ public abstract class ActionBase implements Action {
                 CatalogFactory.createCatalog(
                         CatalogContext.create(
                                 new Options().set(CatalogOptions.WAREHOUSE, 
warehouse)));
-        flinkCatalog = new FlinkCatalog(catalog, "table-store", 
DEFAULT_DATABASE);
+        flinkCatalog = new FlinkCatalog(catalog, catalogName, 
DEFAULT_DATABASE);
 
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
index 93de9d26..9943bfdb 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.store.connector.LogicalTypeConversion;
-import org.apache.flink.table.store.file.catalog.Identifier;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.types.DataField;
 import org.apache.flink.table.store.types.DataType;
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -87,10 +87,12 @@ public class MergeIntoAction extends ActionBase {
     // field names of target table
     private final List<String> targetFieldNames;
 
-    // source
+    // target table
+    @Nullable private String targetAlias;
+
+    // source table
     @Nullable private String sourceTable;
-    private Identifier sourceTableIdentifier;
-    @Nullable private String source;
+    @Nullable private String[] sourceSqls;
     @Nullable private String sourceAlias;
 
     // merge condition
@@ -150,14 +152,20 @@ public class MergeIntoAction extends ActionBase {
                         .collect(Collectors.toList());
     }
 
-    public MergeIntoAction withSourceTable(String sourceTable) {
+    public MergeIntoAction withTargetAlias(String targetAlias) {
+        this.targetAlias = targetAlias;
+        return this;
+    }
+
+    public MergeIntoAction withSourceTable(@Nullable String sourceAlias, 
String sourceTable) {
+        this.sourceAlias = sourceAlias;
         this.sourceTable = sourceTable;
         return this;
     }
 
-    public MergeIntoAction withSource(String source, String sourceAlias) {
-        this.source = source;
+    public MergeIntoAction withSourceSqls(String sourceAlias, String... 
sourceSqls) {
         this.sourceAlias = sourceAlias;
+        this.sourceSqls = sourceSqls;
         return this;
     }
 
@@ -221,6 +229,10 @@ public class MergeIntoAction extends ActionBase {
 
         MergeIntoAction action = new MergeIntoAction(tablePath.f0, 
tablePath.f1, tablePath.f2);
 
+        if (params.has("target-as")) {
+            action.withTargetAlias(params.get("target-as"));
+        }
+
         if (!initSource(params, action)) {
             return Optional.empty();
         }
@@ -273,30 +285,30 @@ public class MergeIntoAction extends ActionBase {
     }
 
     private static boolean initSource(MultipleParameterTool params, 
MergeIntoAction action) {
-        String sourceTable = params.get("using-table");
-        String source = params.get("using-source");
-        String sourceAlias = params.get("as");
+        String sourceTable = params.get("source-table");
+        Collection<String> sourceSqls = params.getMultiParameter("source-sql");
+        String sourceAlias = params.get("source-as");
 
         int count = 0;
         if (sourceTable != null) {
-            action.withSourceTable(params.get("using-table"));
+            action.withSourceTable(sourceAlias, sourceTable);
             count++;
         }
 
-        if (source != null) {
+        if (sourceSqls != null) {
             if (sourceAlias == null) {
                 System.err.println(
-                        "The source and its alias must be specified 
together.\n"
+                        "--source-sql and --source-as must be specified 
together.\n"
                                 + "Run <action> --help for help.");
                 return false;
             }
-            action.withSource(source, sourceAlias);
+            action.withSourceSqls(sourceAlias, sourceSqls.toArray(new 
String[0]));
             count++;
         }
 
         if (count != 1) {
             System.err.println(
-                    "Please specify either \"source table\" or \"source, 
source's alias\".\n"
+                    "Please specify source as either \"source-table\" or 
\"source-sql\".\n"
                             + "Run <action> --help for help.");
             return false;
         }
@@ -359,8 +371,10 @@ public class MergeIntoAction extends ActionBase {
         System.out.println(
                 "  merge-into --warehouse <warehouse-path>\n"
                         + "             --database <database-name>\n"
-                        + "             --table <table-name>\n"
-                        + "             --using-table <source-table>\n"
+                        + "             --table <target-table-name>\n"
+                        + "             [--target-as <target-table-alias>]\n"
+                        + "             --source-table <source-table>\n"
+                        + "             [--source-as <source-table-alias>]\n"
                         + "             --on <merge-condition>\n"
                         + "             --merge-actions 
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>\n"
                         + "             --matched-upsert-condition 
<matched-condition>\n"
@@ -371,35 +385,45 @@ public class MergeIntoAction extends ActionBase {
                         + "             
--not-matched-by-source-upsert-condition <not-matched-by-source-condition>\n"
                         + "             --not-matched-by-source-upsert-set 
<not-matched-upsert-changes>\n"
                         + "             
--not-matched-by-source-delete-condition <not-matched-by-source-condition>");
+
         System.out.println("  matched-upsert-changes format:");
         System.out.println(
                 "    col=<source-table>.col | expression [, ...] (do not add 
'<target-table>.' before 'col')");
         System.out.println(
                 "    * (upsert with all source cols; require target table's 
schema is equal to source's)");
+
         System.out.println("  not-matched-upsert-changes format:");
         System.out.println("    col=expression (cannot use source table's 
col)");
+
         System.out.println("  insert-values format:");
         System.out.println(
                 "    col1,col2,...,col_end (must specify values of all 
columns; can use <source-table>.col or expression)");
         System.out.println(
                 "    * (insert with all source cols; require target table's 
schema is equal to source's)");
+
         System.out.println(
                 "  not-matched-condition: cannot use target table's columns to 
construct condition expression.");
         System.out.println(
                 "  not-matched-by-source-condition: cannot use source table's 
columns to construct condition expression.");
+
         System.out.println("  alternative arguments:");
         System.out.println("    --path <table-path> to represent the table 
path.");
         System.out.println(
-                "    --using-source <source-expression> --as <source-alias> to 
replace --using-table.");
+                "    --source-sql <sql>[, --source-sql <sql> ...] can create a 
new table as source table at runtime.");
         System.out.println();
 
         System.out.println("Note: ");
         System.out.println("  1. Target table must has primary keys.");
         System.out.println(
                 "  2. All conditions, set changes and values should use Flink 
SQL syntax. Please quote them with \" to escape special characters.");
-        System.out.println("  3. source-alias cannot be duplicated with 
existed table name.");
-        System.out.println("  4. At least one merge action must be 
specified.");
-        System.out.println("  5. How to determine the changed rows with 
different \"matched\":");
+        System.out.println(
+                "  3. source-alias cannot be duplicated with existed table 
name. "
+                        + "If you use --source-sql, source-alias must be 
specified and equal to the table name in \"CREATE\" statement.");
+        System.out.println(
+                "  4. If the source table is not in the same place as target 
table, "
+                        + "the source-table-name or the source-alias should be 
qualified (database.table or catalog.database.table if in different catalog).");
+        System.out.println("  5. At least one merge action must be 
specified.");
+        System.out.println("  6. How to determine the changed rows with 
different \"matched\":");
         System.out.println(
                 "    matched: changed rows are from target table and each can 
match a source table row "
                         + "based on merge-condition and optional 
matched-condition.");
@@ -410,36 +434,42 @@ public class MergeIntoAction extends ActionBase {
                 "    not-matched-by-source: changed rows are from target table 
and all row cannot match any source table row "
                         + "based on merge-condition and optional 
not-matched-by-source-condition.");
         System.out.println(
-                "  6. If both matched-upsert and matched-delete actions are 
present, their conditions must both be present too "
+                "  7. If both matched-upsert and matched-delete actions are 
present, their conditions must both be present too "
                         + "(same to not-matched-by-source-upsert and 
not-matched-by-source-delete). Otherwise, all conditions are optional.");
-        System.out.println("  7. source-alias cannot be duplicated with 
existed table name.");
         System.out.println();
 
         System.out.println("Examples:");
         System.out.println(
                 "  merge-into --path hdfs:///path/to/T\n"
-                        + "             --using-table S\n"
-                        + "             --on \"T.k=S.k\"\n"
+                        + "             --source-table S\n"
+                        + "             --on \"T.k = S.k\"\n"
                         + "             --merge-actions matched-upsert\n"
-                        + "             --matched-upsert-condition 
\"T.v<>S.v\"\n"
-                        + "             --matched-upsert-set \"S.k,S.v\"");
+                        + "             --matched-upsert-condition \"T.v <> 
S.v\"\n"
+                        + "             --matched-upsert-set \"v = S.v\"");
         System.out.println(
-                "  It will find matched rows of target table that meet 
condition (T.k=S.k AND T.v<>S.v) ind '-D' to test_table, then update T.v with 
S.v.");
+                "  It will find matched rows of target table that meet 
condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
     }
 
     @Override
     public void run() throws Exception {
+        // handle target alias
+        if (targetAlias != null) {
+            tEnv.registerTable(targetAlias, 
tEnv.from(identifier.getObjectName()));
+        }
+
         // prepare source table
         if (sourceTable != null) {
-            if (sourceTable.contains(".")) {
-                sourceTableIdentifier = Identifier.fromString(sourceTable);
-            } else {
-                sourceTableIdentifier =
-                        Identifier.create(identifier.getDatabaseName(), 
sourceTable);
-            }
+            sourceAlias = sourceTable;
         } else {
-            tEnv.registerTable(sourceAlias, tEnv.sqlQuery(source));
-            sourceTableIdentifier = 
Identifier.create(identifier.getDatabaseName(), sourceAlias);
+            // NOTE: sql may change current catalog and database
+            try {
+                for (String sql : sourceSqls) {
+                    tEnv.executeSql(sql).await();
+                }
+            } catch (Throwable t) {
+                LOG.error("Error occurs when executing sql in --source-sql.", 
t);
+                throw new RuntimeException("Error occurs when executing sql in 
--source-sql.", t);
+            }
         }
 
         List<DataStream<RowData>> dataStreams =
@@ -465,7 +495,7 @@ public class MergeIntoAction extends ActionBase {
         List<String> project;
         // extract project
         if (matchedUpsertSet.equals("*")) {
-            project = 
Collections.singletonList(sourceTableIdentifier.getObjectName() + "." + "*");
+            project = Collections.singletonList(sourceAlias + "." + "*");
         } else {
             // validate upsert changes
             // no need to check primary keys changes because merge condition 
must contain all pks
@@ -488,10 +518,7 @@ public class MergeIntoAction extends ActionBase {
             // the table name is added before column name to avoid ambiguous 
column reference
             project =
                     targetFieldNames.stream()
-                            .map(
-                                    name ->
-                                            changes.getOrDefault(
-                                                    name, 
identifier.getObjectName() + "." + name))
+                            .map(name -> changes.getOrDefault(name, 
targetTableName() + "." + name))
                             .collect(Collectors.toList());
         }
 
@@ -500,8 +527,8 @@ public class MergeIntoAction extends ActionBase {
                 String.format(
                         "SELECT %s FROM %s INNER JOIN %s ON %s %s",
                         String.join(",", project),
-                        identifier.getEscapedFullName(),
-                        sourceTableIdentifier.getEscapedFullName(),
+                        escapedTargetName(),
+                        escapedSourceName(),
                         mergeCondition,
                         matchedUpsertCondition == null ? "" : "WHERE " + 
matchedUpsertCondition);
         LOG.info("Query used for matched-update:\n{}", query);
@@ -548,8 +575,8 @@ public class MergeIntoAction extends ActionBase {
                 String.format(
                         "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
                         String.join(",", project),
-                        identifier.getEscapedFullName(),
-                        sourceTableIdentifier.getEscapedFullName(),
+                        escapedTargetName(),
+                        escapedSourceName(),
                         mergeCondition,
                         notMatchedBySourceUpsertCondition == null
                                 ? ""
@@ -571,7 +598,7 @@ public class MergeIntoAction extends ActionBase {
         // the table name is added before column name to avoid ambiguous 
column reference
         List<String> project =
                 targetFieldNames.stream()
-                        .map(name -> identifier.getObjectName() + "." + name)
+                        .map(name -> targetTableName() + "." + name)
                         .collect(Collectors.toList());
 
         // use inner join to find matched records
@@ -579,8 +606,8 @@ public class MergeIntoAction extends ActionBase {
                 String.format(
                         "SELECT %s FROM %s INNER JOIN %s ON %s %s",
                         String.join(",", project),
-                        identifier.getEscapedFullName(),
-                        sourceTableIdentifier.getEscapedFullName(),
+                        escapedTargetName(),
+                        escapedSourceName(),
                         mergeCondition,
                         matchedDeleteCondition == null ? "" : "WHERE " + 
matchedDeleteCondition);
         LOG.info("Query used by matched-delete:\n{}", query);
@@ -601,8 +628,8 @@ public class MergeIntoAction extends ActionBase {
                 String.format(
                         "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
                         String.join(",", targetFieldNames),
-                        identifier.getEscapedFullName(),
-                        sourceTableIdentifier.getEscapedFullName(),
+                        escapedTargetName(),
+                        escapedSourceName(),
                         mergeCondition,
                         notMatchedBySourceDeleteCondition == null
                                 ? ""
@@ -625,8 +652,8 @@ public class MergeIntoAction extends ActionBase {
                 String.format(
                         "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
                         notMatchedInsertValues,
-                        sourceTableIdentifier.getEscapedFullName(),
-                        identifier.getEscapedFullName(),
+                        escapedSourceName(),
+                        escapedTargetName(),
                         mergeCondition,
                         notMatchedInsertCondition == null
                                 ? ""
@@ -674,4 +701,19 @@ public class MergeIntoAction extends ActionBase {
                             return rowData;
                         });
     }
+
+    private String targetTableName() {
+        return targetAlias == null ? identifier.getObjectName() : targetAlias;
+    }
+
+    private String escapedTargetName() {
+        return String.format(
+                "`%s`.`%s`.`%s`", catalogName, identifier.getDatabaseName(), 
targetTableName());
+    }
+
+    private String escapedSourceName() {
+        return Arrays.stream(sourceAlias.split("\\."))
+                .map(s -> String.format("`%s`", s))
+                .collect(Collectors.joining("."));
+    }
 }
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
index 0cefe77f..27ef4dca 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.connector.action;
 
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.types.Row;
@@ -106,7 +107,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
         //   SET v = v || '_nmu', last_action = 'not_matched_upsert'
         // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
-        action.withSourceTable("S")
+        action.withSourceTable(null, "S")
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
                 .withMatchedUpsert(
                         "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, 
last_action = 'matched_upsert'")
@@ -130,57 +131,79 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
     }
 
     @Test
-    public void testUsingSource() throws Exception {
+    public void testTargetAlias() throws Exception {
         // prepare table T
         prepareTable(CoreOptions.ChangelogProducer.NONE);
 
-        // similar to:
-        // MERGE INTO T
-        // USING (SELECT * FROM S WHERE k < 12) AS SS
-        // ON T.k = SS.k AND T.dt = SS.dt
-        // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE
-        //   SET v = SS.v, last_action = 'matched_upsert'
-        // WHEN MATCHED AND SS.v IS NULL THEN DELETE
-        // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt)
-        // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
-        //   SET v = v || '_nmu', last_action = 'not_matched_upsert'
-        // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
-        action.withSource("SELECT * FROM S WHERE k < 12", "SS")
-                .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
-                .withMatchedUpsert(
-                        "T.v <> SS.v AND SS.v IS NOT NULL",
-                        "v = SS.v, last_action = 'matched_upsert'")
-                .withMatchedDelete("SS.v IS NULL")
-                .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt")
-                .withNotMatchedBySourceUpsert(
-                        "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
-                .withNotMatchedBySourceDelete("dt >= '02-28'");
+        action.withTargetAlias("TT")
+                .withSourceTable(null, "S")
+                .withMergeCondition("TT.k = S.k AND TT.dt = S.dt")
+                .withMatchedDelete("S.v IS NULL");
 
         validateActionRunResult(
                 action,
                 Arrays.asList(
-                        changelogRow("-U", 7, "v_7", "creation", "02-28"),
-                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
-                        changelogRow("-D", 8, "v_8", "creation", "02-28"),
-                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
-                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
-                        changelogRow("-U", 2, "v_2", "creation", "02-27"),
-                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("-U", 3, "v_3", "creation", "02-27"),
-                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("-D", 5, "v_5", "creation", "02-28"),
-                        changelogRow("-D", 6, "v_6", "creation", "02-28"),
-                        changelogRow("-D", 9, "v_9", "creation", "02-28"),
-                        changelogRow("-D", 10, "v_10", "creation", "02-28")),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
-                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
-                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
-                        changelogRow("+I", 11, "v_11", "insert", "02-29")));
+                        changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+I", 7, "v_7", "creation", "02-28"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+    }
+
+    @Test
+    public void testUsingDdlSource() throws Exception {
+        // prepare table T
+        prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+        TestValuesTableFactory.registerData(
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "02-27"),
+                        changelogRow("+I", 4, null, "02-27"),
+                        changelogRow("+I", 8, null, "02-28")));
+
+        String catalog =
+                String.format(
+                        "CREATE CATALOG test_cat WITH ('type' = 'table-store', 
'warehouse' = '%s')",
+                        getTempDirPath());
+        String useCatalog = "USE CATALOG test_cat";
+        String id =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(
+                                changelogRow("+I", 1, "v_1", "02-27"),
+                                changelogRow("+I", 4, null, "02-27"),
+                                changelogRow("+I", 8, null, "02-28")));
+        String ddl =
+                String.format(
+                        "CREATE TEMPORARY TABLE S (k INT, v STRING, dt 
STRING)\n"
+                                + "WITH ('connector' = 'values', 'bounded' = 
'true', 'data-id' = '%s');",
+                        id);
+
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        // test current catalog and current database
+        action.withSourceSqls("S", catalog, useCatalog, ddl)
+                .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+                .withMatchedDelete("S.v IS NULL");
+        validateActionRunResult(
+                action,
+                Arrays.asList(
+                        changelogRow("-D", 4, "v_4", "creation", "02-27"),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
+                        changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+I", 7, "v_7", "creation", "02-28"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
     }
 
     @Test
@@ -190,7 +213,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         // build MergeIntoAction
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
-        action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+        action.withSourceSqls("SS", "CREATE TEMPORARY VIEW SS AS SELECT k, v, 
'unknown', dt FROM S")
                 .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
                 .withMatchedUpsert(null, "*");
 
@@ -225,7 +248,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         // build MergeIntoAction
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
-        action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+        action.withSourceSqls("SS", "CREATE TEMPORARY VIEW SS AS SELECT k, v, 
'unknown', dt FROM S")
                 .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
                 .withNotMatchedInsert("SS.k < 12", "*");
 
@@ -274,7 +297,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         // build MergeIntoAction
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
-        action.withSourceTable("S")
+        action.withSourceTable(null, "S")
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
                 .withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");
 


Reply via email to